Int numStreams=5;
ListKafkaStreams=new ArrayList (numStreams);
for (int i=0; iKafkaStreams. Add (KafkaUtils createStream (... ));
}
JavaPairDStreamUnifiedStream=streamingContext. Union (kafkaStreams. Get (0), kafkaStreams subList (1, kafkaStreams. The size ()));
UnifiedStream. Print ();
Program, I use the source of kafka, single input dstream is no problem, when using multiple dstream, tested, both input data in a dstream receive, but problem is: the program runs only once, or receive data at a time, only behind no longer received, my code is as follows:
String groupId=args [0];
String zookeepers=args [1].
String switchable viewer="tpsN5a";
The Integer numPartitions=Integer. ParseInt (args [3]).
MapTopicsMap=new HashMap (a);
For (String topic: switchable viewer. The split (", ")) {
TopicsMap. Put (topic, numPartitions);
}
How long//statistics a
Duration batchInterval=Durations. Seconds (2);
SparkConf SparkConf=new SparkConf (.) setAppName (" JavaKafkaConsumerWordCount ");
JavaStreamingContext SSC=new JavaStreamingContext (sparkConf,
BatchInterval);
JavaPairReceiverInputDStreamKafkaStream=KafkaUtils
CreateStream (SSC, zookeepers, groupId, topicsMap, StorageLevel MEMORY_AND_DISK_SER ());
String topics2="tpsN5b";
MapTopicsMap2=new HashMap (a);
TopicsMap2. Put (topics2 numPartitions);
JavaPairReceiverInputDStreamKafkaStream2=KafkaUtils
CreateStream (SSC, zookeepers, groupId, topicsMap2, StorageLevel MEMORY_AND_DISK_SER ());
ListKafkaStreams=new ArrayList (2);
KafkaStreams. Add (kafkaStream);
KafkaStreams. Add (kafkaStream2);
SSC. Checkpoint ("/spark/stream/checkpoint/d1 ");
JavaPairDStreamUnifiedStream=SSC. Union (kafkaStreams. Get (0), kafkaStreams subList (1, kafkaStreams. The size ()));
JavaDStreamLines=unifiedStream//kafkaStream
. The map (new FunctionString> () {
@ Override
Public String call (Tuple2 & lt; String, String> Arg0)
Throws the Exception {
Logger. Warn (Thread. CurrentThread (). The getName () + "msg1:" + arg0. _1 + "| msg2:" + arg0) _2);
Return arg0) _2 ();
}
});
Ask how to solve the problems mentioned above, when using multiple input DStream receive data parallel, can continue to receive data streaming applications, not only receive a?
CodePudding user response:
I am using spark1.3.1, USES write-ahead log, I receive data of write-ahead log, can receive the kafka hair message, but how to receive less than in the programCodePudding user response:
Problem solved? I also encountered similar problems,CodePudding user response:
Run for a moment, no problem, the program finally, add the following code:SSC. Start ();
SSC. AwaitTermination ();
CodePudding user response:
Could you tell me your more input DStream groupid is the same? In other words you more input DStream belong to a consumer group?