Home > other >  Spark streaming multiple input DStream run in parallel
Spark streaming multiple input DStream run in parallel

Time:09-16

Spark website and multiple receive (corresponding to multiple input dstream) run in parallel with the following code to solve:
 
Int numStreams=5;
List KafkaStreams=new ArrayList (numStreams);
for (int i=0; i KafkaStreams. Add (KafkaUtils createStream (... ));
}
JavaPairDStream UnifiedStream=streamingContext. Union (kafkaStreams. Get (0), kafkaStreams subList (1, kafkaStreams. The size ()));
UnifiedStream. Print ();


Program, I use the source of kafka, single input dstream is no problem, when using multiple dstream, tested, both input data in a dstream receive, but problem is: the program runs only once, or receive data at a time, only behind no longer received, my code is as follows:

 
String groupId=args [0];
String zookeepers=args [1].
String switchable viewer="tpsN5a";
The Integer numPartitions=Integer. ParseInt (args [3]).

Map TopicsMap=new HashMap (a);
For (String topic: switchable viewer. The split (", ")) {
TopicsMap. Put (topic, numPartitions);
}
How long//statistics a
Duration batchInterval=Durations. Seconds (2);
SparkConf SparkConf=new SparkConf (.) setAppName (" JavaKafkaConsumerWordCount ");

JavaStreamingContext SSC=new JavaStreamingContext (sparkConf,
BatchInterval);

JavaPairReceiverInputDStream KafkaStream=KafkaUtils
CreateStream (SSC, zookeepers, groupId, topicsMap, StorageLevel MEMORY_AND_DISK_SER ());

String topics2="tpsN5b";
Map TopicsMap2=new HashMap (a);
TopicsMap2. Put (topics2 numPartitions);
JavaPairReceiverInputDStream KafkaStream2=KafkaUtils
CreateStream (SSC, zookeepers, groupId, topicsMap2, StorageLevel MEMORY_AND_DISK_SER ());

List KafkaStreams=new ArrayList (2);
KafkaStreams. Add (kafkaStream);
KafkaStreams. Add (kafkaStream2);

SSC. Checkpoint ("/spark/stream/checkpoint/d1 ");

JavaPairDStream UnifiedStream=SSC. Union (kafkaStreams. Get (0), kafkaStreams subList (1, kafkaStreams. The size ()));

JavaDStream Lines=unifiedStream//kafkaStream
. The map (new Function String> () {
@ Override
Public String call (Tuple2 & lt; String, String> Arg0)
Throws the Exception {
Logger. Warn (Thread. CurrentThread (). The getName () + "msg1:" + arg0. _1 + "| msg2:" + arg0) _2);
Return arg0) _2 ();
}
});



Ask how to solve the problems mentioned above, when using multiple input DStream receive data parallel, can continue to receive data streaming applications, not only receive a?

CodePudding user response:

I am using spark1.3.1, USES write-ahead log, I receive data of write-ahead log, can receive the kafka hair message, but how to receive less than in the program

CodePudding user response:

Problem solved? I also encountered similar problems,

CodePudding user response:

Run for a moment, no problem, the program finally, add the following code:
SSC. Start ();
SSC. AwaitTermination ();

CodePudding user response:

Could you tell me your more input DStream groupid is the same? In other words you more input DStream belong to a consumer group?
  • Related