Description
To make the ingest of ECS JSON logs more native and seamless, we want a user experience that does not require custom configuration, JSON parsing, and other processing configuration. Instead, we should detect if a log comes in ECS JSON format and parse it appropriately.
We have identified integrations and Elasticsearch ingest node pipelines to be the best place to automatically detect and parse ECS JSON logs, for the following reason:
- Takes load off the edge (at the expense of more load on ES).
- Allows multiple integrations (log files, k8s, PCF, CloudWatch, ...) to use the same pipeline more easily.
- Not having the logic inside Beats allows using alternative shippers, such as Fluentd, more easily.
- Works well with integration packages that already rely mostly on ES pipelines for parsing.
I have implemented some improvements for the ES processors that now allow to properly handle ECS JSON logs. I've also created a POC for an ES ingest node pipeline:
Click here to see POC ingest pipeline
PUT _ingest/pipeline/logs-routing
{
"processors": [
{
"pipeline": {
"name": "logs-ecs-json",
"if": "def message = ctx.message; return message != null && message.startsWith('{') && message.endsWith('}') && message.contains('\"@timestamp\"') && message.contains('\"ecs') && message.contains('version\"')"
}
},
{
"script": {
"source": "ctx.data_stream.dataset = /[\\/*?\"<>|, #:-]/.matcher(ctx.data_stream.dataset).replaceAll('_')",
"if": "ctx.data_stream?.dataset != null"
}
},
{
"script": {
"source": "ctx.data_stream.namespace = /[\\/*?\"<>|, #:]/.matcher(ctx.data_stream.namespace).replaceAll('_')",
"if": "ctx.data_stream?.namespace != null"
}
},
{
"set": {
"field": "data_stream.type",
"value": "logs",
"override": false
}
},
{
"set": {
"field": "data_stream.dataset",
"value": "generic",
"override": false
}
},
{
"set": {
"field": "data_stream.namespace",
"value": "default",
"override": false
}
},
{
"set": {
"field": "event.dataset",
"copy_from": "data_stream.dataset",
"override": true
}
},
{
"set": {
"field": "_index",
"value": "logs-{{{data_stream.dataset}}}-{{{data_stream.namespace}}}"
}
}
]
}
PUT _ingest/pipeline/logs-ecs-json
{
"processors": [
{
"rename": {
"field": "message",
"target_field": "_ecs_json_message",
"ignore_missing": true
}
},
{
"json": {
"field": "_ecs_json_message",
"add_to_root": true,
"add_to_root_conflict_strategy": "merge",
"allow_duplicate_keys": true,
"if": "ctx.containsKey('_ecs_json_message')",
"on_failure": [
{
"rename": {
"field": "_ecs_json_message",
"target_field": "message",
"ignore_missing": true
}
},
{
"set": {
"field": "error.message",
"value": "Error while parsing JSON",
"override": false
}
}
]
}
},
{
"remove": {
"field": "_ecs_json_message",
"ignore_missing": true
}
},
{
"dot_expander": {
"field": "*",
"override": true
}
},
{
"join": {
"field": "error.stack_trace",
"separator": "\n",
"if": "ctx.error?.stack_trace instanceof Collection"
}
}
]
}
POST _ingest/pipeline/logs-routing/_simulate
{
"docs": [
{
"_source": {
"@timestamp": "2021-06-22T15:55:00.848Z",
"agent": {
"type": "filebeat",
"version": "7.13.2",
"hostname": "Felixs-MBP-2.fritz.box",
"ephemeral_id": "d3706fd0-2f07-465f-8bee-b8fe1a22effe",
"id": "de46bbbc-bb66-465a-96f5-bc69c7197ae6",
"name": "Felixs-MBP-2.fritz.box"
},
"container": {
"id": "spring-projects"
},
"message": "{\"@timestamp\":\"2021-05-19T22:40:52.169Z\", \"ecs.version\": \"1.2.0\", \"data_stream.dataset\": \"spring-petclinic.log\", \"log.level\": \"WARN\", \"message\":\"HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=4h23s964ms).\", \"service.name\":\"spring-petclinic\",\"process.thread.name\":\"HikariPool-1 housekeeper\",\"log.logger\":\"com.zaxxer.hikari.pool.HikariPool\",\"error.stack_trace\":[\"at foo.bar\", \"\\tat foo.bar\"]}",
"event": {
"dataset": "generic"
},
"elastic_agent": {
"id": "f91381a2-a5db-400d-ac02-682ab7a619d5",
"snapshot": false,
"version": "7.13.2"
},
"log": {
"offset": 0,
"file": {
"path": "/Users/felixbarnsteiner/projects/github/spring-projects/spring-petclinic/logs/app.log.json"
}
},
"input": {
"type": "log"
},
"data_stream": {
"type": "log",
"dataset": "generic",
"namespace": "default"
},
"host": {
"name": "Felixs-MBP-2.fritz.box",
"hostname": "Felixs-MBP-2.fritz.box",
"architecture": "x86_64",
"os": {
"platform": "darwin",
"version": "10.16",
"family": "darwin",
"name": "Mac OS X",
"kernel": "20.4.0",
"build": "20E232",
"type": "macos"
},
"id": "30E396A5-08D7-5AB9-9C39-BD9ED335AC24"
},
"ecs": {
"version": "1.8.0"
}
}
}
]
}
Eventually, all log-input-type integrations should leverage this pipeline to automatically handle ECS JSON logs.
If the performance hit of the scripts used in the pipeline turns out to be an issue, we can think about implementing a dedicated processor in Elasticsearch with a pure Java implementation.
We may want to add an option to the integration settings for users to opt out of auto-detection of ECS JSON in case they're worried about the potential additional impact. That can work by adding a tag
to the events and conditionally executing the ECS pipeline.
- [Feature Request] Enable ECS log detection by default using Custom Log integration #1332
- CloudWatch logs integration
- Azure logs integration
- K8s logs integration
- PCF logs integration
- httpjson
Where to start?
The custom logs integration comes to mind first but it seems there are some dependencies and open questions. As there are already integrations that include pipelines for CloudWatch and Azure logs, these might be a good start.
Open questions
- How to apply the pipeline for input-only integrations. See also [Feature Request] Enable ECS log detection by default using Custom Log integration #1332 (comment)
- How to allow user-defined pipelines that run after the built-in ones (maybe just a plain-text input in the integration settings?)
- Can we define the pipeline in a single place vs copy/paste to every integration?