Skip to content

Add reroute processor #76511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2d80949
Add data_stream_router processor
felixbarny Apr 15, 2022
1ede9bc
Don't override event.dataset
felixbarny Apr 27, 2022
eba46c6
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Jun 27, 2022
39b57ee
Update docs/changelog/76511.yaml
felixbarny Jun 27, 2022
6751fc2
Merge branch 'main' into data_stream_router_processor
felixbarny Feb 21, 2023
3e25d0e
Use IngestDocument#redirect
felixbarny Feb 22, 2023
c31b51f
Extract data stream parsing methods
felixbarny Feb 22, 2023
ac5f282
Add changelog
felixbarny Feb 22, 2023
d4bc1ad
Revert "Add changelog"
felixbarny Feb 22, 2023
afa1527
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Mar 1, 2023
c76d5da
Rename processor to reroute and add destination option
felixbarny Mar 1, 2023
b514a6c
Fix docs build
felixbarny Mar 1, 2023
6c44e35
Add support for field references and multiple fallbacks
felixbarny Mar 4, 2023
4888843
Add skip_if_target_unchanged option
felixbarny Mar 9, 2023
26b3c79
Use mustache scripts instead of custom field reference syntax
felixbarny Mar 19, 2023
88a7d2a
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Mar 22, 2023
2d02830
Revert "Use mustache scripts instead of custom field reference syntax"
felixbarny Mar 27, 2023
584f8c5
Set event.dataset to be consistent with data_stream.dataset
felixbarny Mar 27, 2023
60df140
Revert "Add skip_if_target_unchanged option"
felixbarny Mar 27, 2023
a312432
Field references to use same syntax as mustache templates
felixbarny Mar 28, 2023
7dbd47e
Add comments to RerouteProcessor
felixbarny Mar 30, 2023
c8cdaae
Use {{data_stream.dataset}} and {{data_sream.namespace}} as default v…
felixbarny Mar 30, 2023
24aaab9
Update docs
felixbarny Mar 30, 2023
19344bb
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Mar 30, 2023
1d454b6
Apply spotless suggestions
felixbarny Mar 30, 2023
5904db6
Merge branch 'main' into data_stream_router_processor
felixbarny Apr 11, 2023
d043ae3
Update docs/reference/ingest/processors/reroute.asciidoc
felixbarny Apr 11, 2023
b2b9e7a
Merge branch 'main' into data_stream_router_processor
joegallo Apr 12, 2023
d5491de
Tidy up imports
joegallo Apr 11, 2023
e3c3ad1
Prefer toList
joegallo Apr 11, 2023
7cc0a99
Add a comment
joegallo Apr 11, 2023
7647f9e
Drop a utility method, add a test
joegallo Apr 11, 2023
e43ac5f
Merge branch 'main' into data_stream_router_processor
elasticmachine Apr 12, 2023
0ef9503
Add sanitization tests
felixbarny Apr 12, 2023
7553638
Move sanitization constants
felixbarny Apr 13, 2023
c49e70b
Use regex for sanitization
felixbarny Apr 13, 2023
0c29f73
Drop toString
joegallo Apr 11, 2023
55177e1
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Apr 13, 2023
e13451e
Be more strict regarding non-string field references
felixbarny Apr 18, 2023
dbb7dd6
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Apr 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/76511.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 76511
summary: Add `data_stream_router` processor
area: Ingest Node
type: enhancement
issues: []
1 change: 1 addition & 0 deletions docs/reference/ingest/processors.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ include::processors/circle.asciidoc[]
include::processors/community-id.asciidoc[]
include::processors/convert.asciidoc[]
include::processors/csv.asciidoc[]
include::processors/data-stream-router.asciidoc[]
include::processors/date.asciidoc[]
include::processors/date-index-name.asciidoc[]
include::processors/dissect.asciidoc[]
Expand Down
43 changes: 43 additions & 0 deletions docs/reference/ingest/processors/data-stream-router.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[[data-stream-router-processor]]
=== Data stream router processor
++++
<titleabbrev>Data stream router</titleabbrev>
++++

The `data_stream_router` processor allows to route a document from one data stream to another data stream.
It can use both static values or values from the document to determine the target data stream.

The name of a data stream is comprised of three parts and looks like this: `<type>-<dataset>-<namespace>`.
See the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme] documentation for more details.

NOTE: `data_stream_router` processor can only be used on data streams that follow the data streams naming scheme.
Trying to use this processor on a data stream with a non-compliant name will raise an exception.

After a `data_stream_router` processor has been executed, all the other processors of the current pipeline are skipped.
This means that at most one `data_stream_router` processor is ever executed within a pipeline,
allowing to define mutually exclusive routing conditions,
similar to a if, else-if, else-if, … condition.

[[data-stream-router-options]]
.Data stream router options
[options="header"]
|======
| Name | Required | Default | Description
| `dataset` | no | - | A static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`. If not set, gets the value of the field `data_stream.dataset` from the document. When using values from the document, the processor replaces invalid characters with `_`. If the option is not set and the document also doesn't contain a corresponding field, it uses the `<dataset>` part of the index name as a fallback.
| `namespace` | no | - | A static value for the namespace part of the data stream name. See the criteria for <<indices-create-api-path-params, index names>> for allowed characters. Must be no longer than 100 characters. If not set, gets the value of the field `data_stream.namespace` from the document. When using values from the document, the processor replaces invalid characters with `_`. If the option is not set and the document also doesn't contain a corresponding field, it uses the `<namespace>` part of the index name as a fallback.
include::common-options.asciidoc[]
|======

NOTE: It's not possible to change the `type` of the data stream by setting the `data_stream.type` in the document.

[source,js]
--------------------------------------------------
{
"data_stream_router": {
"tag": "nginx",
"if" : "ctx?.log?.file?.path?.contains('nginx')",
"dataset": "nginx"
}
}
--------------------------------------------------
// NOTCONSOLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

public final class DataStreamRouterProcessor extends AbstractProcessor {
public static final String TYPE = "data_stream_router";

private static final String DATA_STREAM_PREFIX = "data_stream.";
private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
private static final char[] DISALLOWED_IN_DATASET = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':', '-' };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make sure this is aligned with what we have in the https://github.com/elastic/package-spec for validation? @mtojek Will likely know where to point you to. Same for namespace.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the package spec currently defines the allowed characters for data_stream.* fields. See also the spec for dataset. I've just tried using invalid characters in the manifest of an integration, such as upper-case chars and -. Both elastic-package lint and elastic-package build did not yield an error.

I took the validation rules from https://github.com/elastic/ecs/blob/main/rfcs/text/0009-data_stream-fields.md#restrictions-on-values and https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mtojek @jsoriano Can you chime in here? I would have expected that we do some validation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this issue open since long ago elastic/package-spec#57, I will ensure this ends up in our backlog.

private static final char[] DISALLOWED_IN_NAMESPACE = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':' };
private static final int MAX_LENGTH = 100;
private static final char REPLACEMENT_CHAR = '_';
private final String dataset;
private final String namespace;

DataStreamRouterProcessor(String tag, String description, String dataset, String namespace) {
super(tag, description);
this.dataset = dataset;
this.namespace = namespace;
}

private static String sanitizeDataStreamField(String s, char[] disallowedInDataset) {
if (s == null) {
return null;
}
s = s.toLowerCase(Locale.ROOT);
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
for (char c : disallowedInDataset) {
s = s.replace(c, REPLACEMENT_CHAR);
}
return s;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
final String type;
final String datasetFallback;
final String namespaceFallback;
int indexOfFirstDash = indexName.indexOf('-');
if (indexOfFirstDash < 0) {
throw createInvalidDataStreamNameException(indexName);
}
int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
if (indexOfSecondDash < 0) {
throw createInvalidDataStreamNameException(indexName);
}
type = parseDataStreamType(indexName, indexOfFirstDash);
datasetFallback = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
namespaceFallback = parseDataStreamNamespace(indexName, indexOfSecondDash);

String dataset = getDataset(ingestDocument, datasetFallback);
String namespace = getNamespace(ingestDocument, namespaceFallback);
ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
ingestDocument.redirect(type + "-" + dataset + "-" + namespace);
return ingestDocument;
}

private static IllegalArgumentException createInvalidDataStreamNameException(String indexName) {
return new IllegalArgumentException(
"invalid data stream name: [" + indexName + "]; must follow naming scheme <type>-<dataset>-<namespace>"
);
}

private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) {
return dataStreamName.substring(0, indexOfFirstDash);
}

private static String parseDataStreamDataset(String dataStreamName, int indexOfFirstDash, int indexOfSecondDash) {
return dataStreamName.substring(indexOfFirstDash + 1, indexOfSecondDash);
}

private static String parseDataStreamNamespace(String dataStreamName, int indexOfSecondDash) {
return dataStreamName.substring(indexOfSecondDash + 1);
}

private String getDataset(IngestDocument ingestDocument, String datasetFallback) {
String dataset = this.dataset;
if (dataset == null) {
dataset = sanitizeDataStreamField(ingestDocument.getFieldValue(DATA_STREAM_DATASET, String.class, true), DISALLOWED_IN_DATASET);
}
if (dataset == null) {
dataset = datasetFallback;
}
return dataset;
}

private String getNamespace(IngestDocument ingestDocument, String namespaceFallback) {
String namespace = this.namespace;
if (namespace == null) {
namespace = sanitizeDataStreamField(
ingestDocument.getFieldValue(DATA_STREAM_NAMESPACE, String.class, true),
DISALLOWED_IN_NAMESPACE
);
}
if (namespace == null) {
namespace = namespaceFallback;
}
return namespace;
}

@Override
public String getType() {
return TYPE;
}

public String getDataStreamDataset() {
return dataset;
}

public String getDataStreamNamespace() {
return namespace;
}

public static final class Factory implements Processor.Factory {

@Override
public DataStreamRouterProcessor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) throws Exception {
String dataset = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "dataset");
if (Objects.equals(sanitizeDataStreamField(dataset, DISALLOWED_IN_DATASET), dataset) == false) {
throw newConfigurationException(TYPE, tag, "dataset", "contains illegal characters");
}
String namespace = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "namespace");
if (Objects.equals(sanitizeDataStreamField(namespace, DISALLOWED_IN_NAMESPACE), namespace) == false) {
throw newConfigurationException(TYPE, tag, "namespace", "contains illegal characters");
}
return new DataStreamRouterProcessor(tag, description, dataset, namespace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()),
entry(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()),
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
entry(RedactProcessor.TYPE, new RedactProcessor.Factory(matcherWatchdog))
entry(RedactProcessor.TYPE, new RedactProcessor.Factory(matcherWatchdog)),
entry(DataStreamRouterProcessor.TYPE, new DataStreamRouterProcessor.Factory())
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.nullValue;

public class DataStreamRouterProcessorFactoryTests extends ESTestCase {

public void testSuccess() throws Exception {
DataStreamRouterProcessor processor = create(null, null);
assertThat(processor.getDataStreamDataset(), nullValue());
assertThat(processor.getDataStreamNamespace(), nullValue());
}

public void testInvalidDataset() throws Exception {
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("my-service", null));
assertThat(e.getMessage(), Matchers.equalTo("[dataset] contains illegal characters"));
}

public void testInvalidNamespace() throws Exception {
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("generic", "foo:bar"));
assertThat(e.getMessage(), Matchers.equalTo("[namespace] contains illegal characters"));
}

private static DataStreamRouterProcessor create(String dataset, String namespace) throws Exception {
Map<String, Object> config = new HashMap<>();
if (dataset != null) {
config.put("dataset", dataset);
}
if (namespace != null) {
config.put("namespace", namespace);
}
return new DataStreamRouterProcessor.Factory().create(null, null, null, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class DataStreamRouterProcessorTests extends ESTestCase {

public void testDefaults() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");

DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
}

public void testSkipFirstProcessor() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");

DataStreamRouterProcessor skippedProcessor = new DataStreamRouterProcessor(null, null, "skip", null);
DataStreamRouterProcessor executedProcessor = new DataStreamRouterProcessor(null, null, "executed", null);
CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
}

public void testSkipLastProcessor() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");

DataStreamRouterProcessor executedProcessor = new DataStreamRouterProcessor(null, null, "executed", null);
DataStreamRouterProcessor skippedProcessor = new DataStreamRouterProcessor(null, null, "skip", null);
CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
}

public void testDataStreamFieldsFromDocument() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
ingestDocument.setFieldValue("data_stream.dataset", "foo");
ingestDocument.setFieldValue("data_stream.namespace", "bar");

DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
}

public void testInvalidDataStreamFieldsFromDocument() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
ingestDocument.setFieldValue("data_stream.dataset", "foo-bar");
ingestDocument.setFieldValue("data_stream.namespace", "baz#qux");

DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
}

private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {
assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type));
assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset));
assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace));
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo(type + "-" + dataset + "-" + namespace));
}

private static IngestDocument createIngestDocument(String dataStream) {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
ingestDocument.setFieldValue("_index", dataStream);
return ingestDocument;
}

private static class SkipProcessor implements WrappingProcessor {
private final Processor processor;

SkipProcessor(Processor processor) {
this.processor = processor;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}

@Override
public Processor getInnerProcessor() {
return processor;
}

@Override
public String getType() {
return "skip";
}

@Override
public String getTag() {
return null;
}

@Override
public String getDescription() {
return null;
}
}
}
Loading