The code is as follows:
The object KafkaWordCount {
Def main (args: Array [String]) {
If (args. Length & lt; 4) {
System. Err. Println (" the Usage: KafkaWordCount & lt; ZkQuorum>
System. The exit (1)
}
StreamingExamples. SetStreamingLogLevels ()
Val Array (zkQuorum, group, switchable viewer, numThreads)=args
Val sparkConf=new sparkConf (). SetAppName (" KafkaWordCount ")
Val SSC=new StreamingContext (sparkConf, Seconds (2))
SSC. Checkpoint (" checkpoint ")
Val topicMap=switchable viewer. The split (", "). The map ((_, numThreads. ToInt)). ToMap
Val lines.=KafkaUtils createStream (SSC, zkQuorum, group, topicMap). The map (_) _2)
Val words=lines. FlatMap (_. The split (" "))
Val wordCounts=words. The map (x=& gt; (1) l, x)
ReduceByKeyAndWindow (+ _ _, _ - _, Minutes (10), Seconds (2), 2)
WordCounts. Print ()
SSC. Start ()
SSC. AwaitTermination ()
}
}