Home > database >  Logstash nginx parser for http_forwared_for
Logstash nginx parser for http_forwared_for

Time:10-18

I am sending nginx logs to elasticsearch by using filebeat and logstash. My logs have the following form:

000.000.000.000 - - [17/Oct/2022:08:25:18  0000] "OPTIONS /favicon.svg HTTP/1.1" 405 559 "https://example.net/auth/login" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36" "111.111.111.111, 222.222.222.222"

I have the following configuration file for logstash:

input {
    beats {
        port => 5035
    }
}

filter {
    grok {
        match => [ "message" , "%{COMBINEDAPACHELOG} %{GREEDYDATA:http_x_forwarded_for}"]
    }
    mutate {
        convert => ["response", "integer"]
        convert => ["bytes", "integer"]
        convert => ["responsetime", "float"]
    }
    geoip {
        source => "clientip"
        target => "geoip"
        add_tag => [ "nginx-geoip" ]
    }
    date {
        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
    }
    useragent {
        source => "message"
    }
}

output {
    elasticsearch {
        hosts => "elasticsearch:9200"
        index => "weblogs-%{ YYYY.MM.dd}"
        document_type => "nginx_logs"
        user => "elastic"
        password => "changeme"
    }
    stdout { codec => rubydebug }
}

This pipeline saves the logs to elasticsearch in the following form:

          "response" : 405,
          "timestamp" : "17/Oct/2022:08:25:18  0000",
          "os_version" : "10",
          "auth" : "-",
          "verb" : "OPTIONS",
          "clientip" : "000.000.000.000",
          "httpversion" : "1.1",
          "referrer" : "\"https://example.net/auth/login\"",
          "geoip" : { },
          "os" : "Windows",
          "os_name" : "Windows",
          "agent" : {
            "version" : "7.17.6",
            "hostname" : "0242869f2486",
            "type" : "filebeat",
            "id" : "4de3a108-35bf-4bd9-8b18-a5d8f9f2bc83",
            "ephemeral_id" : "3a5f78b5-bae0-41f6-8d63-eea700df6c3c",
            "name" : "0242869f2486"
          },
          "log" : {
            "file" : {
              "path" : "/var/log/nginx/access.log"
            },
            "offset" : 1869518
          },
          "bytes" : 559,
          "ident" : "-",
          "http_x_forwarded_for" : " \"111.111.111.111, 222.222.222.222\"",
          "os_full" : "Windows 10",
          "@timestamp" : "2022-10-17T08:25:18.000Z",
          "request" : "/favicon.svg",
          "device" : "Spider",
          "name" : "favicon",
          "input" : {
            "type" : "log"
          },
          "host" : {
            "name" : "0242869f2486"
          },
          "os_major" : "10",
          "@version" : "1",
          "message" : "000.000.000.000 - - [17/Oct/2022:08:25:18  0000] \"OPTIONS /favicon.svg HTTP/1.1\" 405 559 \"https://example.net/auth/login\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"111.111.111.111, 222.222.222.222\"",
          "tags" : [
            "beats_input_codec_plain_applied",
            "_geoip_lookup_failure"
          ]

However, my goal is to parse the first IP from the http_forwared_for field and add a new filed called real_client_ip and add it save it to the index. Is there a way to achieve that?

CodePudding user response:

You can add one more grok filter to your logstash pipeline after first grok filter.

filter {
    grok {
        match => [ "message" , "%{COMBINEDAPACHELOG} %{GREEDYDATA:http_x_forwarded_for}"]
    }
    grok {
        match => [ "http_x_forwarded_for" , "%{IP:real_client_ip}"]
    }
    mutate {
        convert => ["response", "integer"]
        convert => ["bytes", "integer"]
        convert => ["responsetime", "float"]
    }
    geoip {
        source => "clientip"
        target => "geoip"
        add_tag => [ "nginx-geoip" ]
    }
    date {
        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
    }
    useragent {
        source => "message"
    }
}

PS: I have validated grok pattern in Kibana but not by running logstash pipeline. but this should work for your usecase.

  • Related