Home > other >  Failed to send requests for switchable viewer mykafka1 with the correlation ids in [0, 20]
Failed to send requests for switchable viewer mykafka1 with the correlation ids in [0, 20]

Time:09-19

Recently wrote a kafka_producer test program, but when the program is run to an error, after a certain time is not to create a new topic:

15 for Fetching topic metadata with the correlation id switchable viewer/Set (mykafka1) from broker [id: 1, host: 192.168.231.12, port: 9092] fail
The ERROR DefaultEventHandler: Failed to send requests for switchable viewer mykafka1 with the correlation ids in [0, 20]

Producer:
The object MyProducer {

Def getProducerConfig brokerAddr: (String) : the Properties={
Val props=new Properties ()
Props. The put (metadata. Broker. "the list", brokerAddr)
Props. The put (" serializer class, "classOf [ItelogEncoder [Person]]. GetName)
Props. The put (" key. The serializer. Class, "classOf [StringEncoder] getName)
//props. The put (" up. The fetch. Max. Bytes ", "" + 1024 * 1024 * 6)
//props. The put (" message. Max. Bytes ", "" + 1024 * 1024 * 4)
Props
}

Def sendMessages (topic: String, messages: the List [Person], brokerAddr: String)={
Val producer=new producer/String, Person (new ProducerConfig (getProducerConfig (brokerAddr)))
Producer. The send (messages. The map {
New KeyedMessage [String, Person] (topic, "Iteblog", _)

} : _ *)
Producer. The close ()
}

Def main (args: Array [String]) {

Val Array (brokerAddr, topic)=args

Val sparkConf=new sparkConf (). SetAppName (this) getClass) getSimpleName). SetMaster (" local "[2])
Val SSC=new StreamingContext (sparkConf, Seconds (2))

For (I & lt; - 0. To (10000)) {
Val data=https://bbs.csdn.net/topics/List (Person (" xiaoming "+ I, 23), the Person (" princess-themed" + I, 24), the Person (" xiaoliu "+ I, 26))
SendMessages (topic, data, brokerAddr)

}
}
Case class Person (var name: String, var age: Int)


Pray god help me, the small white thanked here

CodePudding user response:

Just also made the same mistake, and time is also in recent days, hope useful to you;
Problem is part of the code: (every time send a message to create a new instance)
Def sendMessages (topic: String, messages: the List [Person], brokerAddr: String)={
val producer=new producer/String, Person (new ProducerConfig (getProducerConfig (brokerAddr)))
Producer. The send (messages. The map {
New KeyedMessage [String, Person] (topic, "Iteblog", _)
}

CodePudding user response:

Val producer=new producer/String, Person (new ProducerConfig (props))
For (j & lt; - 0. To (10000)) {
Producer. The send (data. The map {
New KeyedMessage [String, Person] (topic, "Iteblog", _)

} : _ *)
Producer. The close ()
}


If the new object wrote a for loop, will be an error:
The Exception in the thread "main" kafka. Producer. ProducerClosedException: producer already closed
  • Related