Public class ExceptionUser {
Private static Logger Logger=Logger. GetLogger (ExceptionUser. Class);
Private static final String KAFKA_TOPIC="TopicA";
Public static void main (String [] args) throws the Exception {
System. SetProperty (" hadoop. Home. Dir ", "D: \ \ checkpoint \ \ hadoop - common - 2.2.0 - bin - master");
SparkConf conf=new SparkConf (). SetMaster (" local [*] "). The setAppName (" WordsCount ");
JavaStreamingContext JSSC=new JavaStreamingContext (conf, Durations. Milliseconds (4000));
MapKafkaParams=new HashMap (a);
KafkaParams. Put (" the bootstrap. The servers, "" 127.0.0.1:9092");
KafkaParams. Put (" key. The deserializer ", "org.apache.kafka.com mon. Serialization. StringDeserializer");
KafkaParams. Put (" value. The deserializer ", "org.apache.kafka.com mon. Serialization. StringDeserializer");
KafkaParams. Put (" group id ", "lingroup");
//kafkaParams. Put (auto. Offset. "reset", the "latest");
SetSwitchable viewer=new HashSet (a);
Switchable viewer. The add (KAFKA_TOPIC);
JavaPairInputDStreamThe stream=org. Apache. Spark. Streaming. Kafka. KafkaUtils.
CreateDirectStream (JSSC, String class, String class, StringDecoder. Class, StringDecoder. Class, kafkaParams, switchable viewer);
JavaPairDStreamTransDStream=stream. FlatMap (new FlatMapFunction String> () {
Public IteratorCall (Tuple2 & lt; String, String> T) throws the Exception {
Return Arrays. AsList (t) _2) split (" ")). The iterator ();
}
}). MapToPair (new PairFunction() {
Public Tuple2 & lt; String, Integer> Call (String t) throws the Exception {
Return new Tuple2 & lt; String, Integer> (t, 1);
}
}). ReduceByKey (new Function2 & lt; Integer, Integer, Integer> () {
Public Integer call (Integer v1, Integer v2) throws the Exception {
//this line of code does not perform
Print. The Print (v1);
//the line is not perform
System. The out. Println (" # # # # # # # "+ v1);
Return v1 + v2;
}
});
TransDStream. Print ();
file:///D:/checkpoint/JSSC. Checkpoint (" ");
JSSC. Start ();
JSSC. AwaitTermination ();
}
}
Public class Print {
Public static void print (Integer I) {
System. The out. Println (" -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ");
System.out.println(i);
System. The out. Println (" -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ");
}
}
CodePudding user response:
Are you sure to get the data? See luo under local debug