Home > Enterprise >  Logstash pipeline pushing the data without transforming it to Elasticsearch
Logstash pipeline pushing the data without transforming it to Elasticsearch

Time:08-17

I'm using a docker-compose file with the ELK stack (Elastic, Logstash, Kibana). The docker-compose.yml file is very straight forward:

version: '3.8'
services: 
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.3.2
    ports:
      - 9300:9300
      - 9200:9200
    environment:
      - http.cors.enabled=true
      - http.cors.allow-origin=*
      - http.cors.allow-methods=OPTIONS,HEAD,GET,POST,PUT,DELETE
      - http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization
      - transport.host=127.0.0.1
      - cluster.name=docker-cluster
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    networks:
      - share-network
  kibana:
    image: docker.elastic.co/kibana/kibana:7.3.2
    ports:
      - 5601:5601
    networks:
      - share-network
    depends_on:
      - elasticsearch
  logstash:
    build: 
      dockerfile: Dockerfile
      context: .
    env_file:
      - .local.env
    volumes: 
      - ./pipelines/provider_scores.conf:/usr/share/logstash/pipeline/logstash.conf
    ports:
      - 9600:9600
      - 5044:5044
    networks:
      - share-network
    depends_on:
      - elasticsearch
      - kibana
volumes:
  elasticsearch_data:
networks:
  share-network:

The Dockerfile in the Logstash service is just to install a few plugins to the Logstash image from Docker:

FROM docker.elastic.co/logstash/logstash:7.3.2

# install dependency
RUN /usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-aggregate
RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-jdbc_streaming
RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-mutate

# copy lib database jdbc jars
COPY ./drivers/mysql/mysql-connector-java-8.0.11.jar /usr/share/logstash/logstash-core/lib/jars/mysql-connector-java.jar
COPY ./drivers/sql-server/mssql-jdbc-7.4.1.jre11.jar /usr/share/logstash/logstash-core/lib/jars/mssql-jdbc.jar
COPY ./drivers/oracle/ojdbc6-11.2.0.4.jar /usr/share/logstash/logstash-core/lib/jars/ojdbc6.jar
COPY ./drivers/postgres/postgresql-42.2.8.jar /usr/share/logstash/logstash-core/lib/jars/postgresql.jar

And the provider_scores.conf file looks like this:

input {
    jdbc {
        jdbc_driver_library => "${LOGSTASH_JDBC_DRIVER_JAR_LOCATION}"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://${DbServer};database=${DataDbName}"
        jdbc_user => "${DataUserName}"
        jdbc_password => "${DataPassword}"
        schedule => "${CronSchedule_Metrics}"
        statement => "
            select pws.ProviderID,
                pws.SpeedScore,
                pws.QualityScore
            from ProviderWeightedOverallScore pws
            order by pws.ProviderID
            "
    }
}
filter {
    aggregate {
        task_id => "%{ProviderID}"
        code => "
             map['providerid'] ||= event.get('ProviderID')
             map['kpi'] ||= []
             map['kpi'] << {
                'speedscore' => event.get('SpeedScore'),
                'qualityscore' => event.get('QualityScore')
                }
             event.cancel()
        "      
        push_previous_map_as_event => true
        timeout => 3
    }
}
output {
    elasticsearch {
        hosts => ["${LOGSTASH_ELASTICSEARCH_HOST}"]
        document_id => "%{providerid}"
        index => "testing-%{ YYYY.MM.dd.HH.mm.ss}"
        action => "update"
        doc_as_upsert => true
    }
    stdout {  }

}

That's my docker configuration. Everything runs ok, the only issue is that the filter->aggregate part is not working, the Elastic Index is being filled with straight data, no transformation occurred.

Any clue why the filter section is not transforming the data?

CodePudding user response:

The most common reason for this is that Logstash pipelines are handled by multiple worker threads depending on the number of CPUs you have. By default, the pipeline.workers setting is set to the number of CPUs you have available on your host.

In order to work correctly, the aggregate filter must run with a single worker otherwise events might not all go through the same worker thread.

So you should make sure to set pipeline.workers: 1 or make sure you have the environment variable PIPELINE_WORKERS set to 1

CodePudding user response:

I am having the same issue. My aggregate doesn't seem to work even after setting pipeline.workers to 1. Do I need to have mapping in place for data transformation to work before I feed the data thru pipeline ?

  • Related