Home > other >  Spark steaming reduceByKeyAndWindow doubt consult ace
Spark steaming reduceByKeyAndWindow doubt consult ace

Time:10-11

Process: kafka reads data, using steaming process processing, I produce a data in kafka, now the spark every two seconds in recent 10 seconds to generate data, but now reduceByKeyAndWindow data have been, could not understand, in principle should keep 10 seconds at most a data,

The code is as follows, please directly,

 
The object AdsClick1 {
Def main (args: Array [String]) : Unit={

Val kafkaBrokerList="192.168.1.114:9092192168 1.115:9092192168:1.116 9092"
Val groupId="SparkStreaming - ads - test"
Val topicSet=Set [String] (" ads_test ")
Val blacklist="blacklist - test"
Val kafkaParamMap=Map [String, String] (
"Group. Id" - & gt; GroupId,
Metadata. Broker. "the list" - & gt; KafkaBrokerList,
Auto. Offset. "reset" - & gt; "Largest"). ToMap
Val conf=new SparkConf ()
[*]. SetMaster (" local ")
SetAppName (s "${this. GetClass. GetSimpleName}")
Set (" spark serializer, "" org. Apache. Spark. Serializer. KryoSerializer")
Set (" spark. Task. MaxFailures ", "16")
Val sc=new SparkContext (conf)
Sc. SetLogLevel (" WARN ")
Val streamingContext=new streamingContext (sc, Seconds (2))
StreamingContext. Checkpoint (" checkpoint - ads ")
Val directStream: InputDStream [(String, String)]=KafkaUtils. CreateDirectStream [String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParamMap, topicSet toSet)
DirectStream. ForeachRDD (RDD=& gt; {
Val offsetsList=RDD. AsInstanceOf [HasOffsetRanges] offsetRanges
Val kc=new KafkaCluster (kafkaParamMap)
Val offsetMap=Map [TopicAndPartition, Long] ()
For (offsets & lt; - offsetsList) {
Val tp=TopicAndPartition (offsets. The topic, offsets. Partition)
OffsetMap +=(tp - & gt; Offsets. UntilOffset)
}
Kc setConsumerOffsets (groupId, offsetMap toMap)
})
Val adsPairRDD: DStream [(String, String, String, String, Integer))]=directStream. The map (item=& gt; {
Val fields: Array [String]=item. _2. The split () ", "
(fields (1), (fields (0), fields (2), fields (3), 1))
})
Val reduceWindowRDD: DStream [(String, String, String, String, Integer))]=adsPairRDD. ReduceByKeyAndWindow (
(a, b)=& gt; (a. a. _1, _2, a. _3, a. _4 + b. _4)
, (a, b)=& gt; (a. a. _1, _2, a. _3, a. _4 - b. _4)
, Seconds (10)
, Seconds (2)
)
ReduceWindowRDD. ForeachRDD (RDD=& gt; {
RDD. Collect (). The foreach (item=& gt; {
Println (item)
})
})
StreamingContext. Start ()
StreamingContext. AwaitTermination ()
}
}




Print the results are as follows:

(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 1))
(cc, (1528037285, hangzhou, left, 0))="behind the data and the number is 0, there should be no data, can't understand
(cc, (1528037285, hangzhou, left, 0))
(cc, (1528037285, hangzhou, left, 0))
(cc, (1528037285, hangzhou, left, 0))
.
.
.
There has always been the data

CodePudding user response:

Have a master to help answer it for me?

CodePudding user response:

Much, I feel dizzy

CodePudding user response:

Add a filter function, data filter out v 0 is not to go
  • Related