-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Handle structured log messages #131027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle structured log messages #131027
Changes from 10 commits
129a2a0
1346b94
b66d78a
9c80cff
115f629
fb76764
7a96b3c
37102c8
c807dc1
7208fce
f711124
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
pr: 131027 | ||
summary: Handle structured log messages | ||
area: Ingest Node | ||
type: feature | ||
issues: | ||
- 130333 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,11 +9,14 @@ | |
|
||
package org.elasticsearch.ingest.otel; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.cluster.metadata.ProjectId; | ||
import org.elasticsearch.common.util.Maps; | ||
import org.elasticsearch.ingest.AbstractProcessor; | ||
import org.elasticsearch.ingest.IngestDocument; | ||
import org.elasticsearch.ingest.Processor; | ||
import org.elasticsearch.ingest.common.JsonProcessor; | ||
|
||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
|
@@ -60,6 +63,8 @@ public class NormalizeForStreamProcessor extends AbstractProcessor { | |
* OpenTelemetry-compatible fields that are renamed by the processor. | ||
*/ | ||
private static final Set<String> KEEP_KEYS; | ||
private static final Logger log = LogManager.getLogger(NormalizeForStreamProcessor.class); | ||
|
||
static { | ||
Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource")); | ||
Set<String> renamedTopLevelFields = new HashSet<>(); | ||
|
@@ -103,6 +108,41 @@ public IngestDocument execute(IngestDocument document) { | |
|
||
// non-OTel document | ||
|
||
// handling structured messages | ||
Map<String, Object> body = null; | ||
try { | ||
String message = document.getFieldValue("message", String.class, true); | ||
if (message != null) { | ||
message = message.trim(); | ||
if (message.startsWith("{") && message.endsWith("}")) { | ||
// if the message is a JSON object, we assume it is a structured log | ||
Object parsedMessage = JsonProcessor.apply(message, true, true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming we already parse JSON in other pipelines but any new security concerns here as we are parsing arbitrary and possibly unsanitized inputs here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new thing here is the fact that it will be on by default to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small correction - it will be on for About security concerns - are you concerned about the case where an attacker could control the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a couple of things: Is there any way for a malicious payload to "break out" and mess up the document. I'm also wondering in general how secure the JSON parsing code is given it will receive potentially end user crafted payloads (If the operator is logging user input from their apps) but I think that's a lesser concern as I guess we already use it elsewhere and we have the whole sandboxing / entitlements system protecting ES as a whole. |
||
if (parsedMessage instanceof Map) { | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> messageMap = (Map<String, Object>) parsedMessage; | ||
if (messageMap.containsKey("@timestamp")) { | ||
log.debug( | ||
"Handling structured message with @timestamp field, assuming ECS-JSON format, merging into root document" | ||
); | ||
source.remove("message"); | ||
JsonProcessor.recursiveMerge(source, messageMap); | ||
} else { | ||
log.debug( | ||
"Handling structured message without @timestamp field, assuming non-ECS format, moving to 'body.structured'" | ||
); | ||
body = new HashMap<>(); | ||
body.put(STRUCTURED_KEY, messageMap); | ||
source.remove("message"); | ||
} | ||
} else { | ||
log.debug("Structured message is not a JSON object, keeping it as a string in 'body.text' field: {}", message); | ||
lukewhiting marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} catch (Exception e) { | ||
eyalkoren marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log.warn("Failed to parse structured message, keeping it as a string in 'body.text' field: {}", e.getMessage()); | ||
} | ||
|
||
Map<String, Object> newAttributes = new HashMap<>(); | ||
// The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing. | ||
// However, at this point we need to move their original values (if they exist) to the one of the new attributes namespaces, except | ||
|
@@ -117,6 +157,11 @@ public IngestDocument execute(IngestDocument document) { | |
} | ||
} | ||
|
||
// if the body is not null, it means we have a structured log that we need to move to the body.structured field. | ||
if (body != null) { | ||
source.put(BODY_KEY, body); | ||
} | ||
|
||
source.put(ATTRIBUTES_KEY, newAttributes); | ||
|
||
renameSpecialKeys(document); | ||
|
Uh oh!
There was an error while loading. Please reload this page.