Home > other >  How to reuse added field in output with logstash
How to reuse added field in output with logstash

Time:05-25

My use case is simple. I have a Kafka in input, and some indexes in Elasticsearch (topic name === index name), and where indexes names are the same as the entities we use in our application, like "buildings", "cars", "bus" (just for example).

input {
    kafka {
        bootstrap_servers => "kafka:29092"
        topics => ['cars', 'bus']
        decorate_events => true
        codec => 'json'
    }
}

filter {
    if [@metadata][kafka][topic] == 'cars' {
        mutate {
            convert => {
                "car_id" => "integer"
            }
            add_field => {
                'id' => "%{car_id}"
            }
        }
    }

    if [@metadata][kafka][topic] == 'bus' {
        mutate {
            convert => {
                "bus_id" => "integer"
            }
            add_field => {
                'id' => "%{bus_id}"
            }
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{car_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{car_id}'
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{bus_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{bus_id}'
        }
    }
}

To update / delete documents from Logstash, I need to use their id. But you'll understand, I don't want to have 50 conditions for each entities, i prefer factorize.

I would like to re-use the "id" I've created in the filter part, in the output to use it in document_id.

Do you have any idea about how I could do it ?

CodePudding user response:

You can do it like this:

input {
    kafka {
        bootstrap_servers => "kafka:29092"
        topics => ['cars', 'bus']
        decorate_events => true
        codec => 'json'
    }
}

filter {
    translate {
       source => "[@metadata][topic]"
       target => "[@metadata][id_field]"
       dictionary => {
          "cars" => "car_id"
          "bus" => "bus_id"
       }
       fallback => "no_id"
    }
    ruby {
        code => "event.set('id', event.get(event.get('[@metadata][id_field]')))"
    }
}

output {
    if '%{[isDelete]}' in [message] {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{id}'
        }
    } else {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            doc_as_upsert => true
            action => 'update'
            document_id => '%{id}'
        }
    }
}
  • Related