Skip to content

Handle structured log messages #131027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 27, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131027.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131027
summary: Handle structured log messages
area: Ingest Node
type: feature
issues:
- 130333
84 changes: 84 additions & 0 deletions docs/reference/enrich-processor/normalize-for-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,87 @@ will be normalized into the following form:
"trace_id": "abcdef1234567890abcdef1234567890"
}
```
## Structured `message` field

If the `message` field in the ingested document is structured as a JSON, the
processor will determine whether it is in ECS format or not, based on the
existence or absence of the `@timestamp` field. If the `@timestamp` field is
present, the `message` field will be considered to be in ECS format, and its
contents will be merged into the root of the document and then normalized as
described above. The `@timestamp` from the `message` field will override the
root `@timestamp` field in the resulting document.
If the `@timestamp` field is absent, the `message` field will be moved to
the `body.structured` field as is, without any further normalization.

For example, if the `message` field is an ECS-JSON, as follows:

```json
{
"@timestamp": "2023-10-01T12:00:00Z",
"message": "{\"@timestamp\":\"2023-10-01T12:01:00Z\",\"log.level\":\"INFO\",\"service.name\":\"my-service\",\"message\":\"The actual log message\",\"http\":{\"method\":\"GET\",\"url\":{\"path\":\"/api/v1/resource\"}}}"

}
```
it will be normalized into the following form:

```json
{
"@timestamp": "2023-10-01T12:01:00Z",
"severity_text": "INFO",
"body": {
"text": "The actual log message"
},
"resource": {
"attributes": {
"service.name": "my-service"
}
},
"attributes": {
"http.method": "GET",
"http.url.path": "/api/v1/resource"
}
}
```

However, if the `message` field is not recognized as ECS format, as follows:

```json
{
"@timestamp": "2023-10-01T12:00:00Z",
"log": {
"level": "INFO"
},
"service": {
"name": "my-service"
},
"tags": ["user-action", "api-call"],
"message": "{\"root_cause\":\"Network error\",\"http\":{\"method\":\"GET\",\"url\":{\"path\":\"/api/v1/resource\"}}}"
}
```
it will be normalized into the following form:

```json
{
"@timestamp": "2023-10-01T12:00:00Z",
"severity_text": "INFO",
"resource": {
"attributes": {
"service.name": "my-service"
}
},
"attributes": {
"tags": ["user-action", "api-call"]
},
"body": {
"structured": {
"root_cause": "Network error",
"http": {
"method": "GET",
"url": {
"path": "/api/v1/resource"
}
}
}
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
Expand All @@ -33,7 +34,7 @@

import static java.util.Map.entry;

public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin {
public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin, ExtensiblePlugin {

public IngestCommonPlugin() {}

Expand Down
8 changes: 8 additions & 0 deletions modules/ingest-otel/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ apply plugin: 'elasticsearch.internal-yaml-rest-test'
esplugin {
description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces'
classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin'
extendedPlugins = ['ingest-common']
}

dependencies {
compileOnly(project(':modules:ingest-common'))
compileOnly project(':modules:lang-painless:spi')
clusterModules project(':modules:ingest-common')
clusterModules project(':modules:lang-painless')
}

restResources {
Expand Down
2 changes: 2 additions & 0 deletions modules/ingest-otel/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@
module org.elasticsearch.ingest.otel {
requires org.elasticsearch.base;
requires org.elasticsearch.server;
requires org.apache.logging.log4j;
requires org.elasticsearch.ingest.common;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@

package org.elasticsearch.ingest.otel;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.common.JsonProcessor;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -60,6 +63,8 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
* OpenTelemetry-compatible fields that are renamed by the processor.
*/
private static final Set<String> KEEP_KEYS;
private static final Logger log = LogManager.getLogger(NormalizeForStreamProcessor.class);

static {
Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource"));
Set<String> renamedTopLevelFields = new HashSet<>();
Expand Down Expand Up @@ -103,6 +108,41 @@ public IngestDocument execute(IngestDocument document) {

// non-OTel document

// handling structured messages
Map<String, Object> body = null;
try {
String message = document.getFieldValue("message", String.class, true);
if (message != null) {
message = message.trim();
if (message.startsWith("{") && message.endsWith("}")) {
// if the message is a JSON object, we assume it is a structured log
Object parsedMessage = JsonProcessor.apply(message, true, true);
Copy link
Contributor

@lukewhiting lukewhiting Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming we already parse JSON in other pipelines but any new security concerns here as we are parsing arbitrary and possibly unsanitized inputs here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new thing here is the fact that it will be on by default to logs-*-* data streams.
I am not familiar with the security concerns that may be related to such JSON parsing.
As for data sanitation - I guess that if sanitation is either applied on the message field or on any other fields after the JSON parsing - this is equivalent to existing sanitation of non-structured logs.
But let me ask for input on both issues.

Copy link
Contributor

@flash1293 flash1293 Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small correction - it will be on for logs,logs.*, not logs-*-*.

About security concerns - are you concerned about the case where an attacker could control the message, but not the whole document, with this approach essentially giving them control over the whole document? It's a good thought, I think it's true but I don't see how it would be problematic. The whole idea of the logs stream is that it can handle arbitrary incoming data in a sensible, opinionated way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a couple of things: Is there any way for a malicious payload to "break out" and mess up the document.

I'm also wondering in general how secure the JSON parsing code is given it will receive potentially end user crafted payloads (If the operator is logging user input from their apps) but I think that's a lesser concern as I guess we already use it elsewhere and we have the whole sandboxing / entitlements system protecting ES as a whole.

if (parsedMessage instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> messageMap = (Map<String, Object>) parsedMessage;
if (messageMap.containsKey("@timestamp")) {
log.debug(
"Handling structured message with @timestamp field, assuming ECS-JSON format, merging into root document"
);
source.remove("message");
JsonProcessor.recursiveMerge(source, messageMap);
} else {
log.debug(
"Handling structured message without @timestamp field, assuming non-ECS format, moving to 'body.structured'"
);
body = new HashMap<>();
body.put(STRUCTURED_KEY, messageMap);
source.remove("message");
}
} else {
log.debug("Structured message is not a JSON object, keeping it as a string in 'body.text' field: {}", message);
}
}
}
} catch (Exception e) {
log.warn("Failed to parse structured message, keeping it as a string in 'body.text' field: {}", e.getMessage());
}

Map<String, Object> newAttributes = new HashMap<>();
// The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing.
// However, at this point we need to move their original values (if they exist) to the one of the new attributes namespaces, except
Expand All @@ -117,6 +157,11 @@ public IngestDocument execute(IngestDocument document) {
}
}

// if the body is not null, it means we have a structured log that we need to move to the body.structured field.
if (body != null) {
source.put(BODY_KEY, body);
}

source.put(ATTRIBUTES_KEY, newAttributes);

renameSpecialKeys(document);
Expand Down
Loading
Loading