Home > Back-end >  Logstash duplicate data in elasticsearch
Logstash duplicate data in elasticsearch

Time:12-03

Can someone help me with logstash. I want to sync PostgreSQL with elastic the problem is the current setting duplicate data in es. I know creating a field "id" can help but still logstash recreate the whole index.

SQL

SELECT id, title, description, type,"updatedAt" FROM public.videos WHERE ("updatedAt" > :sql_last_value AND "updatedAt" < NOW())
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/postgresql-jdbc.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://{POSTGRES_URL}"
    jdbc_user => "${POSTGRES_USERNAME}"
    jdbc_password => "${POSTGRES_PASSWORD}"
    jdbc_paging_enabled => true
    tracking_column => "updatedAt"
    tracking_column_type => "timestamp"
    use_column_value => true
    schedule => "*/5 * * * * *"
    statement_filepath => "/usr/share/logstash/sql/video.sql"
    tags => "video"
  }

output {
  if "video" in [tags] {
    elasticsearch {
      hosts => ["${ES_HOSTS}"]
      index => "video"
      # document_id => "%{[@metadata][_id]}"
      user => "${ES_USER}"
      password => "${ES_PASSWORD}"
      cacert => '/etc/logstash/certificates/ca.crt'
      doc_as_upsert => true
    }
  }
}

Settings from offical documentation for mysql

  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
<iframe name="sif1" sandbox="allow-forms allow-modals allow-scripts" frameborder="0"></iframe>

CodePudding user response:

Regarding your input section, you need to make sure that your tracking_column is actually a real field that can be tracked:

tracking_column => "updatedAt"

Your output section should look like this instead. It is important to properly set the document_id and [@metadata][_id] has no value in random value, hence the duplicates.

elasticsearch {
  hosts => ["${ES_HOSTS}"]
  index => "video"
  document_id => "%{id}"                   <---- change this line
  user => "${ES_USER}"
  password => "${ES_PASSWORD}"
  cacert => '/etc/logstash/certificates/ca.crt'
}
  • Related