Skip to content

[Meta] Enable seamless ECS log onboarding for all log inputs #1454

Closed
@felixbarny

Description

@felixbarny

To make the ingest of ECS JSON logs more native and seamless, we want a user experience that does not require custom configuration, JSON parsing, and other processing configuration. Instead, we should detect if a log comes in ECS JSON format and parse it appropriately.

We have identified integrations and Elasticsearch ingest node pipelines to be the best place to automatically detect and parse ECS JSON logs, for the following reason:

  • Takes load off the edge (at the expense of more load on ES).
  • Allows multiple integrations (log files, k8s, PCF, CloudWatch, ...) to use the same pipeline more easily.
  • Not having the logic inside Beats allows using alternative shippers, such as Fluentd, more easily.
  • Works well with integration packages that already rely mostly on ES pipelines for parsing.

I have implemented some improvements for the ES processors that now allow to properly handle ECS JSON logs. I've also created a POC for an ES ingest node pipeline:

Click here to see POC ingest pipeline

PUT _ingest/pipeline/logs-routing
{
  "processors": [
    {
      "pipeline": {
        "name": "logs-ecs-json",
        "if": "def message = ctx.message; return message != null && message.startsWith('{') && message.endsWith('}') && message.contains('\"@timestamp\"') && message.contains('\"ecs') && message.contains('version\"')"
      }
    },
    {
      "script": {
        "source": "ctx.data_stream.dataset = /[\\/*?\"<>|, #:-]/.matcher(ctx.data_stream.dataset).replaceAll('_')",
        "if": "ctx.data_stream?.dataset != null"
      }
    },
    {
      "script": {
        "source": "ctx.data_stream.namespace = /[\\/*?\"<>|, #:]/.matcher(ctx.data_stream.namespace).replaceAll('_')",
        "if": "ctx.data_stream?.namespace != null"
      }
    },
    {
      "set": {
        "field": "data_stream.type",
        "value": "logs",
        "override": false
      }
    },
    {
      "set": {
        "field": "data_stream.dataset",
        "value": "generic",
        "override": false
      }
    },
    {
      "set": {
        "field": "data_stream.namespace",
        "value": "default",
        "override": false
      }
    },
    {
      "set": {
        "field": "event.dataset",
        "copy_from": "data_stream.dataset",
        "override": true
      }
    },
    {
      "set": {
        "field": "_index",
        "value": "logs-{{{data_stream.dataset}}}-{{{data_stream.namespace}}}"
      }
    }
  ]
}

PUT _ingest/pipeline/logs-ecs-json
{
  "processors": [
    {
      "rename": {
        "field": "message",
        "target_field": "_ecs_json_message",
        "ignore_missing": true
      }
    },
    {
      "json": {
        "field": "_ecs_json_message",
        "add_to_root": true,
        "add_to_root_conflict_strategy": "merge",
        "allow_duplicate_keys": true,
        "if": "ctx.containsKey('_ecs_json_message')",
        "on_failure": [
          {
            "rename": {
              "field": "_ecs_json_message",
              "target_field": "message",
              "ignore_missing": true
            }
          },
          {
            "set": {
              "field": "error.message",
              "value": "Error while parsing JSON",
              "override": false
            }
          }
        ]
      }
    },
    {
      "remove": {
        "field": "_ecs_json_message",
        "ignore_missing": true
      }
    },
    {
      "dot_expander": {
        "field": "*",
        "override": true
      }
    },
    {
      "join": {
        "field": "error.stack_trace",
        "separator": "\n",
        "if": "ctx.error?.stack_trace instanceof Collection"
      }
    }
  ]
}


POST _ingest/pipeline/logs-routing/_simulate
{
  "docs": [
    {
      "_source": {
        "@timestamp": "2021-06-22T15:55:00.848Z",
        "agent": {
          "type": "filebeat",
          "version": "7.13.2",
          "hostname": "Felixs-MBP-2.fritz.box",
          "ephemeral_id": "d3706fd0-2f07-465f-8bee-b8fe1a22effe",
          "id": "de46bbbc-bb66-465a-96f5-bc69c7197ae6",
          "name": "Felixs-MBP-2.fritz.box"
        },
        "container": {
          "id": "spring-projects"
        },
        "message": "{\"@timestamp\":\"2021-05-19T22:40:52.169Z\", \"ecs.version\": \"1.2.0\", \"data_stream.dataset\": \"spring-petclinic.log\", \"log.level\": \"WARN\", \"message\":\"HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=4h23s964ms).\", \"service.name\":\"spring-petclinic\",\"process.thread.name\":\"HikariPool-1 housekeeper\",\"log.logger\":\"com.zaxxer.hikari.pool.HikariPool\",\"error.stack_trace\":[\"at foo.bar\", \"\\tat foo.bar\"]}",
        "event": {
          "dataset": "generic"
        },
        "elastic_agent": {
          "id": "f91381a2-a5db-400d-ac02-682ab7a619d5",
          "snapshot": false,
          "version": "7.13.2"
        },
        "log": {
          "offset": 0,
          "file": {
            "path": "/Users/felixbarnsteiner/projects/github/spring-projects/spring-petclinic/logs/app.log.json"
          }
        },
        "input": {
          "type": "log"
        },
        "data_stream": {
          "type": "log",
          "dataset": "generic",
          "namespace": "default"
        },
        "host": {
          "name": "Felixs-MBP-2.fritz.box",
          "hostname": "Felixs-MBP-2.fritz.box",
          "architecture": "x86_64",
          "os": {
            "platform": "darwin",
            "version": "10.16",
            "family": "darwin",
            "name": "Mac OS X",
            "kernel": "20.4.0",
            "build": "20E232",
            "type": "macos"
          },
          "id": "30E396A5-08D7-5AB9-9C39-BD9ED335AC24"
        },
        "ecs": {
          "version": "1.8.0"
        }
      }
    }
  ]
}

Eventually, all log-input-type integrations should leverage this pipeline to automatically handle ECS JSON logs.
If the performance hit of the scripts used in the pipeline turns out to be an issue, we can think about implementing a dedicated processor in Elasticsearch with a pure Java implementation.

We may want to add an option to the integration settings for users to opt out of auto-detection of ECS JSON in case they're worried about the potential additional impact. That can work by adding a tag to the events and conditionally executing the ECS pipeline.

Where to start?
The custom logs integration comes to mind first but it seems there are some dependencies and open questions. As there are already integrations that include pipelines for CloudWatch and Azure logs, these might be a good start.

Open questions

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions