My use case is simple. I have a Kafka in input, and some indexes in Elasticsearch (topic name === index name), and where indexes names are the same as the entities we use in our application, like "buildings", "cars", "bus" (just for example).
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
if [@metadata][kafka][topic] == 'cars' {
mutate {
convert => {
"car_id" => "integer"
}
add_field => {
'id' => "%{car_id}"
}
}
}
if [@metadata][kafka][topic] == 'bus' {
mutate {
convert => {
"bus_id" => "integer"
}
add_field => {
'id' => "%{bus_id}"
}
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{car_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{car_id}'
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{bus_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{bus_id}'
}
}
}
To update / delete documents from Logstash, I need to use their id. But you'll understand, I don't want to have 50 conditions for each entities, i prefer factorize.
I would like to re-use the "id" I've created in the filter part, in the output to use it in document_id.
Do you have any idea about how I could do it ?
CodePudding user response:
You can do it like this:
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
translate {
source => "[@metadata][topic]"
target => "[@metadata][id_field]"
dictionary => {
"cars" => "car_id"
"bus" => "bus_id"
}
fallback => "no_id"
}
ruby {
code => "event.set('id', event.get(event.get('[@metadata][id_field]')))"
}
}
output {
if '%{[isDelete]}' in [message] {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{id}'
}
} else {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{id}'
}
}
}