I am trying to create multiple pipelines first-pipeline and second-pipeline in logstash listening to beats events from the same port but getting an error saying the address in use for the second pipeline and the reason why I am using two pipelines is that I want pipeline.workers: 1 only for selected indexes in which sequence of log matters. ERROR:
Pipeline_id:second-pipeline
Plugin: <LogStash::Inputs::Beats host=>"127.0.0.1", port=>5044,
id=>"7c07a66c7959c1734f6aead8ca456bc7c3b086aafb7b5bd4882ee45e0f3c9fc5",
enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_4d22b75f-e478-4fbc-
b5fe-27ae02ac486b", enable_metric=>true, charset=>"UTF-8">, ssl=>false,
add_hostname=>true, ssl_verify_mode=>"none", ssl_peer_metadata=>false,
include_codec_tag=>true, ssl_handshake_timeout=>10000, tls_min_version=>1,
tls_max_version=>1.2, cipher_suites=>["TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384",
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256"], client_inactivity_timeout=>60,
executor_threads=>8>
Error: Address already in use
Exception: Java::JavaNet::BindException
Stack: sun.nio.ch.Net.bind0(Native Method)
sun.nio.ch.Net.bind(sun/nio/ch/Net.java:438)
sun.nio.ch.Net.bind(sun/nio/ch/Net.java:430)
sun.nio.ch.ServerSocketChannelImpl.bind(sun/nio/ch/ServerSocketChannelImpl.java:225)
pipelines.yml
- pipeline.id: first-pipeline
path.config: "/Users/gyrao/Documents/ELK/logstash-6.5.4/config/pipelines/api-address.config"
pipeline.batch.size: 1
- pipeline.id: second-pipeline
path.config: "/Users/gyrao/Documents/ELK/logstash-6.5.4/config/pipelines/my-config.config"
pipeline.workers: 1
pipeline.batch.size: 1
queue.type: persisted
path.queue: "/Users/gyrao/Documents/ELK/logstash-6.5.4/config/queue"
my-config.config
input {
beats {
host => "127.0.0.1"
port => 5044
}
}
filter {
}
output {
}
api-address.config
input {
beats {
host => "127.0.0.1"
port => 5044
}
}
filter {
}
output {
}
CodePudding user response:
You can not have two inputs with the same port, but you can use a distributor pattern to receive everything in one input, and then sending to a different pipeline with the configuration you need.
So you can have:
master-pipeline
input {
beats {
host => "127.0.0.1"
port => 5044
}
}
filter {
}
output {
if([someField] === "some_value") {
pipeline {
send_to => first_pipeline
}
}
else {
pipeline {
send_to => second_pipeline
}
}
}
Then individual pipelines should look like this:
input {
pipeline {
address => first_pipeline
}
}
...
And your pipelines file:
- pipeline.id: master-pipeline
path.config: "/Users/gyrao/Documents/ELK/logstash-6.5.4/config/pipelines/master-pipeline.config"
pipeline.batch.size: 1
- pipeline.id: first-pipeline
path.config: "/Users/gyrao/Documents/ELK/logstash-6.5.4/config/pipelines/api-address.config"
pipeline.batch.size: 1
- pipeline.id: second-pipeline
path.config: "/Users/gyrao/Documents/ELK/logstash-6.5.4/config/pipelines/my-config.config"
pipeline.workers: 1
pipeline.batch.size: 1
queue.type: persisted
path.queue: "/Users/gyrao/Documents/ELK/logstash-6.5.4/config/queue"
Read more here: https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html#distributor-pattern