diff --git a/docs/changelog/131027.yaml b/docs/changelog/131027.yaml new file mode 100644 index 0000000000000..7bbe428aa0c2f --- /dev/null +++ b/docs/changelog/131027.yaml @@ -0,0 +1,6 @@ +pr: 131027 +summary: Handle structured log messages +area: Ingest Node +type: feature +issues: + - 130333 diff --git a/docs/reference/enrich-processor/normalize-for-stream.md b/docs/reference/enrich-processor/normalize-for-stream.md index ed004c7c8bd38..78f183c7f72ef 100644 --- a/docs/reference/enrich-processor/normalize-for-stream.md +++ b/docs/reference/enrich-processor/normalize-for-stream.md @@ -153,3 +153,87 @@ will be normalized into the following form: "trace_id": "abcdef1234567890abcdef1234567890" } ``` +## Structured `message` field + +If the `message` field in the ingested document is structured as a JSON, the +processor will determine whether it is in ECS format or not, based on the +existence or absence of the `@timestamp` field. If the `@timestamp` field is +present, the `message` field will be considered to be in ECS format, and its +contents will be merged into the root of the document and then normalized as +described above. The `@timestamp` from the `message` field will override the +root `@timestamp` field in the resulting document. +If the `@timestamp` field is absent, the `message` field will be moved to +the `body.structured` field as is, without any further normalization. + +For example, if the `message` field is an ECS-JSON, as follows: + +```json +{ + "@timestamp": "2023-10-01T12:00:00Z", + "message": "{\"@timestamp\":\"2023-10-01T12:01:00Z\",\"log.level\":\"INFO\",\"service.name\":\"my-service\",\"message\":\"The actual log message\",\"http\":{\"method\":\"GET\",\"url\":{\"path\":\"/api/v1/resource\"}}}" + +} +``` +it will be normalized into the following form: + +```json +{ + "@timestamp": "2023-10-01T12:01:00Z", + "severity_text": "INFO", + "body": { + "text": "The actual log message" + }, + "resource": { + "attributes": { + "service.name": "my-service" + } + }, + "attributes": { + "http.method": "GET", + "http.url.path": "/api/v1/resource" + } +} +``` + +However, if the `message` field is not recognized as ECS format, as follows: + +```json +{ + "@timestamp": "2023-10-01T12:00:00Z", + "log": { + "level": "INFO" + }, + "service": { + "name": "my-service" + }, + "tags": ["user-action", "api-call"], + "message": "{\"root_cause\":\"Network error\",\"http\":{\"method\":\"GET\",\"url\":{\"path\":\"/api/v1/resource\"}}}" +} +``` +it will be normalized into the following form: + +```json +{ + "@timestamp": "2023-10-01T12:00:00Z", + "severity_text": "INFO", + "resource": { + "attributes": { + "service.name": "my-service" + } + }, + "attributes": { + "tags": ["user-action", "api-call"] + }, + "body": { + "structured": { + "root_cause": "Network error", + "http": { + "method": "GET", + "url": { + "path": "/api/v1/resource" + } + } + } + } +} +``` diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 6e517d644cadb..f349606449aac 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -21,6 +21,7 @@ import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; @@ -33,7 +34,7 @@ import static java.util.Map.entry; -public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin { +public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin, ExtensiblePlugin { public IngestCommonPlugin() {} diff --git a/modules/ingest-otel/build.gradle b/modules/ingest-otel/build.gradle index 54a00508a0a07..e2c4d53ca68bd 100644 --- a/modules/ingest-otel/build.gradle +++ b/modules/ingest-otel/build.gradle @@ -12,6 +12,14 @@ apply plugin: 'elasticsearch.internal-yaml-rest-test' esplugin { description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces' classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin' + extendedPlugins = ['ingest-common'] +} + +dependencies { + compileOnly(project(':modules:ingest-common')) + compileOnly project(':modules:lang-painless:spi') + clusterModules project(':modules:ingest-common') + clusterModules project(':modules:lang-painless') } restResources { diff --git a/modules/ingest-otel/src/main/java/module-info.java b/modules/ingest-otel/src/main/java/module-info.java index 20b349d930c85..d256fb9a168ee 100644 --- a/modules/ingest-otel/src/main/java/module-info.java +++ b/modules/ingest-otel/src/main/java/module-info.java @@ -10,4 +10,6 @@ module org.elasticsearch.ingest.otel { requires org.elasticsearch.base; requires org.elasticsearch.server; + requires org.apache.logging.log4j; + requires org.elasticsearch.ingest.common; } diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java index d0b2385916823..bde8420c8021c 100644 --- a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java +++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java @@ -9,11 +9,14 @@ package org.elasticsearch.ingest.otel; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.Maps; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.common.JsonProcessor; import java.util.HashMap; import java.util.HashSet; @@ -60,6 +63,8 @@ public class NormalizeForStreamProcessor extends AbstractProcessor { * OpenTelemetry-compatible fields that are renamed by the processor. */ private static final Set KEEP_KEYS; + private static final Logger log = LogManager.getLogger(NormalizeForStreamProcessor.class); + static { Set keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource")); Set renamedTopLevelFields = new HashSet<>(); @@ -103,6 +108,41 @@ public IngestDocument execute(IngestDocument document) { // non-OTel document + // handling structured messages + Map body = null; + try { + String message = document.getFieldValue("message", String.class, true); + if (message != null) { + message = message.trim(); + if (message.startsWith("{") && message.endsWith("}")) { + // if the message is a JSON object, we assume it is a structured log + Object parsedMessage = JsonProcessor.apply(message, true, true); + if (parsedMessage instanceof Map) { + @SuppressWarnings("unchecked") + Map messageMap = (Map) parsedMessage; + if (messageMap.containsKey("@timestamp")) { + log.debug( + "Handling structured message with @timestamp field, assuming ECS-JSON format, merging into root document" + ); + source.remove("message"); + JsonProcessor.recursiveMerge(source, messageMap); + } else { + log.debug( + "Handling structured message without @timestamp field, assuming non-ECS format, moving to 'body.structured'" + ); + body = new HashMap<>(); + body.put(STRUCTURED_KEY, messageMap); + source.remove("message"); + } + } else { + log.debug("Structured message is not a JSON object, keeping it as a string in 'body.text' field: {}", message); + } + } + } + } catch (Exception e) { + log.warn("Failed to parse structured message, keeping it as a string in 'body.text' field: {}", e.getMessage()); + } + Map newAttributes = new HashMap<>(); // The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing. // However, at this point we need to move their original values (if they exist) to the one of the new attributes namespaces, except @@ -117,6 +157,11 @@ public IngestDocument execute(IngestDocument document) { } } + // if the body is not null, it means we have a structured log that we need to move to the body.structured field. + if (body != null) { + source.put(BODY_KEY, body); + } + source.put(ATTRIBUTES_KEY, newAttributes); renameSpecialKeys(document); diff --git a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java index bf2ef92c23dcb..d1a504f8b0ee1 100644 --- a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java +++ b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java @@ -9,9 +9,14 @@ package org.elasticsearch.ingest.otel; +import org.elasticsearch.common.Strings; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -438,6 +443,202 @@ public void testExecute_arraysNotFlattened() { assertNull(attributes.get("service")); } + /** + * Test for ECS-JSON {@code message} field normalization. + *

+ * Input document: + *

+     * {
+     *   "@timestamp": "2023-10-01T12:00:00Z",
+     *   "message": "{
+     *     \"@timestamp\": \"2023-10-02T12:00:00Z\",
+     *     \"log.level\": \"INFO\",
+     *     \"service.name\": \"my-service\",
+     *     \"message\": \"The actual log message\",
+     *     \"http\": {
+     *       \"method\": \"GET\",
+     *       \"url\": {
+     *         \"path\": \"/api/v1/resource\"
+     *       }
+     *     }
+     *   }"
+     * }
+     * 
+ *

+ * Expected output document: + *

+     * {
+     *   "@timestamp": "2023-10-02T12:00:00Z",
+     *   "severity_text": "INFO",
+     *   "body": {
+     *     "text": "The actual log message"
+     *   },
+     *   "resource": {
+     *     "attributes": {
+     *       "service.name": "my-service"
+     *     }
+     *   },
+     *   "attributes": {
+     *     "http.method": "GET",
+     *     "http.url.path": "/api/v1/resource"
+     *   }
+     * }
+     * 
+ */ + public void testExecute_ecsJsonMessageNormalization() throws IOException { + Map httpUrl = new HashMap<>(); + httpUrl.put("path", "/api/v1/resource"); + + Map http = new HashMap<>(); + http.put("method", "GET"); + http.put("url", httpUrl); + + Map message = new HashMap<>(); + message.put("@timestamp", "2023-10-02T12:00:00Z"); + message.put("log.level", "INFO"); + message.put("service.name", "my-service"); + message.put("message", "The actual log message"); + message.put("http", http); + + Map source = new HashMap<>(); + source.put("@timestamp", "2023-10-01T12:00:00Z"); + source.put("message", representJsonAsString(message)); + + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + processor.execute(document); + + Map result = document.getSource(); + + assertEquals("2023-10-02T12:00:00Z", result.get("@timestamp")); + assertEquals("INFO", result.get("severity_text")); + assertEquals("The actual log message", get(get(result, "body"), "text")); + assertEquals(Map.of("service.name", "my-service"), get(get(result, "resource"), "attributes")); + assertEquals(Map.of("http.method", "GET", "http.url.path", "/api/v1/resource"), get(result, "attributes")); + } + + /** + * Test for non-ECS-JSON {@code message} field normalization. + *

+ * Input document: + *

+     * {
+     *   "@timestamp": "2023-10-01T12:00:00Z",
+     *   "log": {
+     *     "level": "INFO"
+     *   },
+     *   "service": {
+     *     "name": "my-service"
+     *   },
+     *   "tags": ["user-action", "api-call"],
+     *   "message": "{
+     *     \"root_cause\": \"Network error\",
+     *     \"http\": {
+     *       \"method\": \"GET\",
+     *       \"url\": {
+     *         \"path\": \"/api/v1/resource\"
+     *       }
+     *     }
+     *   }"
+     * }
+     * 
+ *

+ * Expected output document: + *

+     * {
+     *   "@timestamp": "2023-10-01T12:00:00Z",
+     *   "severity_text": "INFO",
+     *   "resource": {
+     *     "attributes": {
+     *       "service.name": "my-service"
+     *     }
+     *   },
+     *   "attributes": {
+     *     "tags": ["user-action", "api-call"]
+     *   },
+     *   "body": {
+     *     "structured": {
+     *       "root_cause": "Network error",
+     *       "http": {
+     *         "method": "GET",
+     *         "url": {
+     *           "path": "/api/v1/resource"
+     *         }
+     *       }
+     *     }
+     *   }
+     * }
+     * 
+ */ + public void testExecute_nonEcsJsonMessageNormalization() throws IOException { + Map httpUrl = new HashMap<>(); + httpUrl.put("path", "/api/v1/resource"); + + Map http = new HashMap<>(); + http.put("method", "GET"); + http.put("url", httpUrl); + + Map message = new HashMap<>(); + message.put("root_cause", "Network error"); + message.put("http", http); + + Map log = new HashMap<>(); + log.put("level", "INFO"); + + Map service = new HashMap<>(); + service.put("name", "my-service"); + + Map source = new HashMap<>(); + source.put("@timestamp", "2023-10-01T12:00:00Z"); + source.put("log", log); + source.put("service", service); + source.put("tags", new ArrayList<>(List.of("user-action", "api-call"))); + source.put("message", representJsonAsString(message)); + + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + processor.execute(document); + + Map result = document.getSource(); + + assertEquals("2023-10-01T12:00:00Z", result.get("@timestamp")); + assertEquals("INFO", result.get("severity_text")); + assertEquals(Map.of("service.name", "my-service"), get(get(result, "resource"), "attributes")); + assertEquals(Map.of("tags", List.of("user-action", "api-call")), get(result, "attributes")); + assertEquals(message, get(get(result, "body"), "structured")); + } + + @SuppressWarnings("unchecked") + public void testOtherPrimitiveMessage() { + Map source = new HashMap<>(); + source.put("message", 42); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + Map result = document.getSource(); + assertEquals(42, ((Map) result.get("body")).get("text")); + } + + @SuppressWarnings("unchecked") + public void testObjectMessage() { + Map source = new HashMap<>(); + Map message = new HashMap<>(); + message.put("key1", "value1"); + message.put("key2", "value2"); + source.put("message", message); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + Map result = document.getSource(); + assertEquals(message, ((Map) result.get("body")).get("text")); + } + + private static String representJsonAsString(Map json) throws IOException { + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + return Strings.toString(xContentBuilder.map(json)); + } + } + /** * A utility function for getting a key from a map and casting the result. */ diff --git a/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/20_normalize_json_message.yml b/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/20_normalize_json_message.yml new file mode 100644 index 0000000000000..50b8ae0676c3e --- /dev/null +++ b/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/20_normalize_json_message.yml @@ -0,0 +1,112 @@ +--- +setup: + - do: + ingest.put_pipeline: + id: "normalize_json_message" + body: + processors: + - normalize_for_stream: {} + +--- +teardown: + - do: + ingest.delete_pipeline: + id: "normalize_json_message" + ignore: 404 + +--- +"Test ECS JSON message normalization": + - do: + index: + index: normalize_json_message_test + id: "ECS-JSON-message" + pipeline: "normalize_json_message" + body: + "@timestamp": "2023-10-01T12:00:00Z" + agent: + name: "agentNameValue" + type: "agentTypeValue" + cloud: + region: "originalCloudRegion" + provider: "aws" + service: + name: "serviceNameValue" + message: | + { + "@timestamp": "2023-10-02T12:00:00Z", + "log.level": "WARN", + "span": { + "id": "spanIdValue" + }, + "trace.id": "traceIdValue", + "message": "This is the actual log message", + "cloud": { + "region": "overriddenCloudRegion", + "availability_zone": "availabilityZoneValue", + "service": { + "type": "serviceTypeValue" + } + }, + "process": { + "args": ["arg1", "arg2"] + }, + "tags": ["tag1", "tag2"] + } + + - do: + get: + index: normalize_json_message_test + id: "ECS-JSON-message" + - match: { _source.@timestamp: "2023-10-02T12:00:00Z" } + - match: { _source.resource.attributes.agent\.name: "agentNameValue" } + - match: { _source.resource.attributes.agent\.type: "agentTypeValue" } + - match: { _source.resource.attributes.cloud\.region: "overriddenCloudRegion" } + - match: { _source.resource.attributes.cloud\.availability_zone: "availabilityZoneValue" } + - match: { _source.resource.attributes.cloud\.provider: "aws" } + - match: { _source.resource.attributes.cloud\.service\.name: "serviceNameValue" } + - match: { _source.attributes.cloud\.service\.type: "serviceTypeValue" } + - match: { _source.resource.attributes.process\.args.0: "arg1" } + - match: { _source.resource.attributes.process\.args.1: "arg2" } + - match: { _source.body.text: "This is the actual log message" } + - match: { _source.severity_text: "WARN" } + - match: { _source.attributes.tags.0: "tag1" } + - match: { _source.attributes.tags.1: "tag2" } + +--- +"Test non-ECS JSON message normalization": + - do: + index: + index: normalize_json_message_test + id: "non-ECS-JSON-message" + pipeline: "normalize_json_message" + body: + "@timestamp": "2023-10-01T12:00:00Z" + agent: + name: "agentNameValue" + type: "agentTypeValue" + message: | + { + "message": "This is the actual log message", + "nested": { + "child": "nestedValue", + "next-level": { + "grandchild": "deepValue" + }, + "array": ["value1", "value2"] + }, + "cloud.region": "cloudRegionValue" + } + + - do: + get: + index: normalize_json_message_test + id: "non-ECS-JSON-message" + - match: { _source.@timestamp: "2023-10-01T12:00:00Z" } + - match: { _source.resource.attributes.agent\.name: "agentNameValue" } + - match: { _source.resource.attributes.agent\.type: "agentTypeValue" } + - match: { _source.body.structured.message: "This is the actual log message" } + - match: { _source.body.structured.nested.child: "nestedValue" } + - match: { _source.body.structured.nested.next-level.grandchild: "deepValue" } + - match: { _source.body.structured.nested.array.0: "value1" } + - match: { _source.body.structured.nested.array.1: "value2" } + - match: { _source.body.structured.cloud\.region: "cloudRegionValue" }