From 390a527ee2e652a5f5e14d6eb94843195322d509 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 15 Apr 2022 15:04:59 +0200 Subject: [PATCH 1/4] Add ability to skip the rest of the processors in a pipeline --- .../ingest/CompoundProcessor.java | 8 +- .../elasticsearch/ingest/IngestDocument.java | 14 ++++ .../ingest/CompoundProcessorTests.java | 79 +++++++++++++++++++ 3 files changed, 98 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index a70f32b77e679..e7b2771ba6bb4 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -148,7 +148,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } @@ -158,7 +158,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume IngestMetric metric; long startTimeInNanos = 0; // iteratively execute any sync processors - while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) { + while (currentProcessor < processorsWithMetrics.size() + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isSkipCurrentPipeline() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -182,7 +184,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume currentProcessor++; } - if (currentProcessor >= processorsWithMetrics.size()) { + if (currentProcessor >= processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); } else { final int finalCurrentProcessor = currentProcessor + 1; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 9c1214196c4a0..b18812f2d2654 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -55,6 +55,7 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); + private boolean skipCurrentPipeline = false; public IngestDocument(String index, String id, String routing, Long version, VersionType versionType, Map source) { // source + at max 5 extra fields @@ -821,6 +822,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer { + skipCurrentPipeline = false; executedPipelines.remove(pipeline.getId()); if (previousPipeline != null) { ingestMetadata.put("pipeline", previousPipeline); @@ -843,6 +845,18 @@ List getPipelineStack() { return pipelineStack; } + /** + * Skips the remaining processors in the current pipeline, except for on failure processors. + * If the current pipeline is executed via a pipeline processor, the caller pipeline will not be skipped. + */ + public void skipCurrentPipeline() { + this.skipCurrentPipeline = true; + } + + boolean isSkipCurrentPipeline() { + return skipCurrentPipeline; + } + @Override public boolean equals(Object obj) { if (obj == this) { diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 2c01f08a42c3b..fdf4dce870770 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -540,6 +540,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } } + public void testSkipPipeline() throws Exception { + TestProcessor processor1 = new TestProcessor(IngestDocument::skipCurrentPipeline); + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + Collections.emptyList(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipAsyncProcessor() throws Exception { + TestProcessor processor1 = new TestProcessor(IngestDocument::skipCurrentPipeline) { + @Override + public boolean isAsync() { + return true; + } + }; + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + Collections.emptyList(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipProcessorIgnoreFailure() throws Exception { + TestProcessor processor1 = new TestProcessor(doc -> { + doc.skipCurrentPipeline(); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor processor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testDontSkipFailureProcessor() throws Exception { + TestProcessor processor = new TestProcessor(doc -> { + doc.skipCurrentPipeline(); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); + TestProcessor failureProcessor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(failureProcessor1, failureProcessor2), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(failureProcessor1.getInvokedCounter(), equalTo(1)); + assertThat(failureProcessor2.getInvokedCounter(), equalTo(1)); + } + private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) { return new TestProcessor( tag, From 41f31ad5b360230c6f76e356afed46fca680581a Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 27 Jun 2022 09:43:17 +0200 Subject: [PATCH 2/4] Update docs/changelog/85932.yaml --- docs/changelog/85932.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/85932.yaml diff --git a/docs/changelog/85932.yaml b/docs/changelog/85932.yaml new file mode 100644 index 0000000000000..db61abcc475a3 --- /dev/null +++ b/docs/changelog/85932.yaml @@ -0,0 +1,5 @@ +pr: 85932 +summary: Add ability to skip the rest of the processors in a pipeline +area: Ingest Node +type: enhancement +issues: [] From 43e49562f8ddaa1f634ee21739e2fb72420c3a83 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 17:45:12 +0100 Subject: [PATCH 3/4] Fix test compilation error --- .../main/java/org/elasticsearch/ingest/IngestDocument.java | 2 +- .../java/org/elasticsearch/ingest/CompoundProcessorTests.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 42c778a0cef37..bffecd2cfe990 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -861,7 +861,7 @@ List getPipelineStack() { Collections.reverse(pipelineStack); return pipelineStack; } - + /** * @return Whether a self referencing check should be performed */ diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index a2ab08284b489..1426200383171 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -540,7 +540,7 @@ public void testSkipPipeline() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor( false, List.of(processor1, processor2), - Collections.emptyList(), + List.of(), relativeTimeProvider ); executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); @@ -563,7 +563,7 @@ public boolean isAsync() { CompoundProcessor compoundProcessor = new CompoundProcessor( false, List.of(processor1, processor2), - Collections.emptyList(), + List.of(), relativeTimeProvider ); executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); From 74b6cace9fe13f486f045a09acd2927aeaf7ad8a Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 17:50:39 +0100 Subject: [PATCH 4/4] Apply spotless --- .../main/java/org/elasticsearch/ingest/CompoundProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 5f229d4bd40d8..459e6c084e798 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -139,7 +139,6 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) {