Home > database >  Logstash Jdbc plugin filling more data in elasticsearch than the actual data, keeps on running
Logstash Jdbc plugin filling more data in elasticsearch than the actual data, keeps on running

Time:06-25

Logstash is running in an infinite loop and I'm having to stop the process, basically keeps filling values in the elasticsearch index. I need exact same number of documents as there are rows in my db table.

Here's my logstash config:

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}

CodePudding user response:

This is happening because query will get all the data every time when the cron job will be executed. Also, you have not provided custom id in elasticsearch output so it will create dynamic id for each document and due to that there will be more data in index (duplicate data with different unique id).

You can use sql_last_value param which store the last crawl date and update your query with where condition on created_date or updated_date. This will get first time all the data from DB and second time onward only data which are newly created or updated.

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table where created_date > :sql_last_value or updated_date > :sql_last_value'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}

PS: I am not pro in SQL so my query might have issue. But I hope you will get the idea.

  • Related