Home > other >  Spark streaming multiple input DStream run in parallel
Spark streaming multiple input DStream run in parallel


Spark website and multiple receive (corresponding to multiple input dstream) run in parallel with the following code to solve:
Int numStreams=5;
List KafkaStreams=new ArrayList (numStreams);
for (int i=0; i KafkaStreams. Add (KafkaUtils createStream (... ));
JavaPairDStream UnifiedStream=streamingContext. Union (kafkaStreams. Get (0), kafkaStreams subList (1, kafkaStreams. The size ()));
UnifiedStream. Print ();

Program, I use the source of kafka, single input dstream is no problem, when using multiple dstream, tested, both input data in a dstream receive, but problem is: the program runs only once, or receive data at a time, only behind no longer received, my code is as follows:

String groupId=args [0];
String zookeepers=args [1].
String switchable viewer="tpsN5a";
The Integer numPartitions=Integer. ParseInt (args [3]).

Map TopicsMap=new HashMap (a);
For (String topic: switchable viewer. The split (", ")) {
TopicsMap. Put (topic, numPartitions);
How long//statistics a
Duration batchInterval=Durations. Seconds (2);
SparkConf SparkConf=new SparkConf (.) setAppName (" JavaKafkaConsumerWordCount ");

JavaStreamingContext SSC=new JavaStreamingContext (sparkConf,

JavaPairReceiverInputDStream KafkaStream=KafkaUtils
CreateStream (SSC, zookeepers, groupId, topicsMap, StorageLevel MEMORY_AND_DISK_SER ());

String topics2="tpsN5b";
Map TopicsMap2=new HashMap (a);
TopicsMap2. Put (topics2 numPartitions);
JavaPairReceiverInputDStream KafkaStream2=KafkaUtils
CreateStream (SSC, zookeepers, groupId, topicsMap2, StorageLevel MEMORY_AND_DISK_SER ());

List KafkaStreams=new ArrayList (2);
KafkaStreams. Add (kafkaStream);
KafkaStreams. Add (kafkaStream2);

SSC. Checkpoint ("/spark/stream/checkpoint/d1 ");

JavaPairDStream UnifiedStream=SSC. Union (kafkaStreams. Get (0), kafkaStreams subList (1, kafkaStreams. The size ()));

JavaDStream Lines=unifiedStream//kafkaStream
. The map (new Function String> () {
@ Override
Public String call (Tuple2 & lt; String, String> Arg0)
Throws the Exception {
Logger. Warn (Thread. CurrentThread (). The getName () + "msg1:" + arg0. _1 + "| msg2:" + arg0) _2);
Return arg0) _2 ();

Ask how to solve the problems mentioned above, when using multiple input DStream receive data parallel, can continue to receive data streaming applications, not only receive a?

CodePudding user response:

I am using spark1.3.1, USES write-ahead log, I receive data of write-ahead log, can receive the kafka hair message, but how to receive less than in the program

CodePudding user response:

Problem solved? I also encountered similar problems,

CodePudding user response:

Run for a moment, no problem, the program finally, add the following code:
SSC. Start ();
SSC. AwaitTermination ();

CodePudding user response:

Could you tell me your more input DStream groupid is the same? In other words you more input DStream belong to a consumer group?
  • Related