Home > front end >  Have a key value pair as logstash output, by only using grok filter
Have a key value pair as logstash output, by only using grok filter

Time:10-23

I am working on a spring boot project and using ELK stack for logging and auditing. I need a logstash.conf file which will process logs and the output can have dynamic key-value pairs. This output data will be used for auditing.

Adding an example for better clarity

Example:

Sample log:

[INFO] [3b1d04f219fc43d18ccb6cb22db6cff4] 2021-10-13_13:43:09.074 Audit_ key1:value1| key2:value2| key3:value3| keyN:valueN

Required logstash output:

{
  "logLevel": [
    [
      "INFO"
    ]
  ],
  "threadId": [
    [
      "3b1d04f219fc43d18ccb6cb22db6cff4"
    ]
  ],
  "timeStamp": [
    [
      "2021-10-13_13:43:09.074"
    ]
  ],
  "class": [
    [
      "Audit_"
    ]
  ],
  "key1": [
    [
      "value1"
    ]
  ],
  "key2": [
    [
      "value2"
    ]
  ],
  "key3": [
    [
      "value3"
    ]
  ],
  "keyN": [
    [
      "valueN"
    ]
  ]
}

Note:

  • "key" will always be a word or string value
  • "value" can be word, numeric or sentence(string with spaces)
  • ":" is the separator between key and value
  • "|" is the separator between key-value pairs
  • The number of key-value pairs can vary.

Can someone suggest/help me with the match pattern to be used here? I am only allowed to use grok filter.

CodePudding user response:

Thank you for guidance Filip and leandrojmp!

Just using a grok filter for this, would make it very complex and also it wont support dynamic key-value pairs.

So I went with a combination of grok followed by kv filter. And this approach worked for me.

Sample Log:

[INFO] [3b1d04f219fc43d18ccb6cb22db6cff4] 2021-10-13_13:43:09.074 _Audit_ key1:value1| key2:value2| key3:value3| keyN:valueN

logstash.conf file:

input {
  beats {
        port => "5044"
    }
}
filter {
  grok {
  match => {"message" => "\[%{LOGLEVEL:logLevel}\]\ \[%{WORD:traceId}\]\ (?<timestamp>[0-9\-_:\.]*)\ %{WORD:class}\ %{GREEDYDATA:message}"]}
    overwrite => [ "message" ]
    }
    if [class] == "_Audit_" {
      kv {
        source => "message"
        field_split => "&"
        value_split => "="
        remove_field => ["message"]
      }
    }
}
output {
  if [class] == "_Audit_" {
    elasticsearch { 
        hosts => ["localhost:9200"] 
        index => "audit-logs-%{ YYYY.MM.dd}"
      }
  }
  else {
    elasticsearch { 
        hosts => ["localhost:9200"] 
        index => "normal-logs-%{ YYYY.MM.dd}"
      }
  }
}
  • Related