The code is as follows, please directly,
The object AdsClick1 {
Def main (args: Array [String]) : Unit={
Val kafkaBrokerList="192.168.1.114:9092192168 1.115:9092192168:1.116 9092"
Val groupId="SparkStreaming - ads - test"
Val topicSet=Set [String] (" ads_test ")
Val blacklist="blacklist - test"
Val kafkaParamMap=Map [String, String] (
"Group. Id" - & gt; GroupId,
Metadata. Broker. "the list" - & gt; KafkaBrokerList,
Auto. Offset. "reset" - & gt; "Largest"). ToMap
Val conf=new SparkConf ()
[*]. SetMaster (" local ")
SetAppName (s "${this. GetClass. GetSimpleName}")
Set (" spark serializer, "" org. Apache. Spark. Serializer. KryoSerializer")
Set (" spark. Task. MaxFailures ", "16")
Val sc=new SparkContext (conf)
Sc. SetLogLevel (" WARN ")
Val streamingContext=new streamingContext (sc, Seconds (2))
StreamingContext. Checkpoint (" checkpoint - ads ")
Val directStream: InputDStream [(String, String)]=KafkaUtils. CreateDirectStream [String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParamMap, topicSet toSet)
DirectStream. ForeachRDD (RDD=& gt; {
Val offsetsList=RDD. AsInstanceOf [HasOffsetRanges] offsetRanges
Val kc=new KafkaCluster (kafkaParamMap)
Val offsetMap=Map [TopicAndPartition, Long] ()
For (offsets & lt; - offsetsList) {
Val tp=TopicAndPartition (offsets. The topic, offsets. Partition)
OffsetMap +=(tp - & gt; Offsets. UntilOffset)
}
Kc setConsumerOffsets (groupId, offsetMap toMap)
})
Val adsPairRDD: DStream [(String, String, String, String, Integer))]=directStream. The map (item=& gt; {
Val fields: Array [String]=item. _2. The split () ", "
(fields (1), (fields (0), fields (2), fields (3), 1))
})
Val reduceWindowRDD: DStream [(String, String, String, String, Integer))]=adsPairRDD. ReduceByKeyAndWindow (
(a, b)=& gt; (a. a. _1, _2, a. _3, a. _4 + b. _4)
, (a, b)=& gt; (a. a. _1, _2, a. _3, a. _4 - b. _4)
, Seconds (10)
, Seconds (2)
)
ReduceWindowRDD. ForeachRDD (RDD=& gt; {
RDD. Collect (). The foreach (item=& gt; {
Println (item)
})
})
StreamingContext. Start ()
StreamingContext. AwaitTermination ()
}
}
Print the results are as follows:
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 0))="behind the data and the number is 0, there should be no data, can't understand
(cc, (1528037285, hangzhou, left, 0))
(cc, (1528037285, hangzhou, left, 0))
(cc, (1528037285, hangzhou, left, 0))
.
.
.
There has always been the data
CodePudding user response:
Have a master to help answer it for me?CodePudding user response:
Much, I feel dizzyCodePudding user response:
Add a filter function, data filter out v 0 is not to go