I am sending nginx
logs to elasticsearch
by using filebeat
and logstash
. My logs have the following form:
000.000.000.000 - - [17/Oct/2022:08:25:18 0000] "OPTIONS /favicon.svg HTTP/1.1" 405 559 "https://example.net/auth/login" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36" "111.111.111.111, 222.222.222.222"
I have the following configuration file for logstash
:
input {
beats {
port => 5035
}
}
filter {
grok {
match => [ "message" , "%{COMBINEDAPACHELOG} %{GREEDYDATA:http_x_forwarded_for}"]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "nginx-geoip" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
useragent {
source => "message"
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "weblogs-%{ YYYY.MM.dd}"
document_type => "nginx_logs"
user => "elastic"
password => "changeme"
}
stdout { codec => rubydebug }
}
This pipeline saves the logs to elasticsearch in the following form:
"response" : 405,
"timestamp" : "17/Oct/2022:08:25:18 0000",
"os_version" : "10",
"auth" : "-",
"verb" : "OPTIONS",
"clientip" : "000.000.000.000",
"httpversion" : "1.1",
"referrer" : "\"https://example.net/auth/login\"",
"geoip" : { },
"os" : "Windows",
"os_name" : "Windows",
"agent" : {
"version" : "7.17.6",
"hostname" : "0242869f2486",
"type" : "filebeat",
"id" : "4de3a108-35bf-4bd9-8b18-a5d8f9f2bc83",
"ephemeral_id" : "3a5f78b5-bae0-41f6-8d63-eea700df6c3c",
"name" : "0242869f2486"
},
"log" : {
"file" : {
"path" : "/var/log/nginx/access.log"
},
"offset" : 1869518
},
"bytes" : 559,
"ident" : "-",
"http_x_forwarded_for" : " \"111.111.111.111, 222.222.222.222\"",
"os_full" : "Windows 10",
"@timestamp" : "2022-10-17T08:25:18.000Z",
"request" : "/favicon.svg",
"device" : "Spider",
"name" : "favicon",
"input" : {
"type" : "log"
},
"host" : {
"name" : "0242869f2486"
},
"os_major" : "10",
"@version" : "1",
"message" : "000.000.000.000 - - [17/Oct/2022:08:25:18 0000] \"OPTIONS /favicon.svg HTTP/1.1\" 405 559 \"https://example.net/auth/login\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"111.111.111.111, 222.222.222.222\"",
"tags" : [
"beats_input_codec_plain_applied",
"_geoip_lookup_failure"
]
However, my goal is to parse the first IP from the http_forwared_for
field and add a new filed called real_client_ip
and add it save it to the index. Is there a way to achieve that?
CodePudding user response:
You can add one more grok
filter to your logstash pipeline after first grok filter.
filter {
grok {
match => [ "message" , "%{COMBINEDAPACHELOG} %{GREEDYDATA:http_x_forwarded_for}"]
}
grok {
match => [ "http_x_forwarded_for" , "%{IP:real_client_ip}"]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "nginx-geoip" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
useragent {
source => "message"
}
}
PS: I have validated grok pattern in Kibana but not by running logstash pipeline. but this should work for your usecase.