Home > other >  Consumer of kafka couldn't read the message
Consumer of kafka couldn't read the message

Time:09-26

 
@ Override
Public void the run () {
//TODO Auto - generated method stub
Try {
Thread.sleep (10000);
{} catch InterruptedException (e)
//TODO Auto - generated the catch block,
e.printStackTrace();
}
The Properties props=new Properties ();
Props. The put (auto. Offset. "reset", the "smallest");//must be added, if you want to read the old data
Props. The put (" zookeeper. Connect ", "Master: 2181");
Props. The put (zk. Connectiontimeout. "ms", "10000");
Props. The put (" group id ", "test - consumer - group");

//Create the connection to the cluster
ConsumerConfig ConsumerConfig=new ConsumerConfig (props);
ConsumerConnector ConsumerConnector=Consumer. CreateJavaConsumerConnector (consumerConfig);

Map TopicCountMap=new HashMap (a);
TopicCountMap. Put (" test ", 1);//data from a subject for a

Map MessageStreams=consumerConnector. CreateMessageStreams (topicCountMap);
KafkaStreamConsumerIteratorSystem. Out.println (" consummer... ");
While (iterator. HasNext ()) {
String message=new String (iterator. Next (). The message ());
System. The out. Println (" received: "+ message);
}

}


Producer run normally, but consumer not to get the data, "group. Id", "test - consumer - group" from the consumer in the conf, consumers also need other configuration? Procedure card in the while (iterator hasNext ()) {that line, how to solve can get it to continue running?

CodePudding user response:

Help help ~
  • Related