Home > other >  For help, o a flink real-time computing in scala realized commodity sales summary the demo.
For help, o a flink real-time computing in scala realized commodity sales summary the demo.

Time:09-16

Data format: the simulation script generated: {18, "price" : "id" : 123}, real-time data, for a scala programming to realize real-time calculation demo the flink implementation results: 1, with id group, each commodity prices have; 2 to id group, one minute of every product sales accumulated; 3, total sales, thank you thank you, thank you very much, recourse for the beginners, got a week,

Flink can realize spark the same results as follows:
The object OrderConsumer {
//Redis configuration
Val dbIndex=0
//each item total sales
Val orderTotalKey="app: : order: : total"
//a minute on every product sales
Val oneMinTotalKey="app: : order: : product", "
//total sales
Val totalKey="app: : order: : all"


Def main (args: Array [String]) : Unit={

//create StreamingContext time slices for 1 second
Val conf=new SparkConf (.) setMaster (" local "). SetAppName (" UserClickCountStat ")
Val SSC=new StreamingContext (conf, Seconds (1))

//Kafka configuration
Val switchable viewer=Set (" order ")
Val brokers="127.0.0.1:9092
"Val kafkaParams=Map [String, String] (
Metadata. Broker. "the list" - & gt; Brokers,
"Serializer class" - & gt; "Kafka. Serializer. StringEncoder")


//create a direct stream
Val kafkaStream=KafkaUtils. CreateDirectStream [String, String, StringDecoder, StringDecoder] (SSC, kafkaParams, switchable viewer)

//parsing JSON
Val events=kafkaStream. FlatMap (line=& gt; Some (JSON. ParseObject (line) _2)))

//ID group statistical number and price combined
Val orders=events. The map (x=& gt; (x.g etString (" id "), x.g etLong (" price "))). GroupByKey (). The map (x=& gt; (x) _1, x. _2. The size, x. _2. ReduceLeft + _) (_))

//output
The orders. ForeachRDD (x=& gt;
X.f oreachPartition (partition=& gt;
Partition. The foreach (x=& gt; {


Println (" id="+ x. _1 +" count="+ x. _2 +" price="+ x. _3)

//save the Redis in
Val jedis=RedisClient. Pool. GetResource
Jedis. Select (dbIndex)
//each item sales accumulative
Jedis. HincrBy (orderTotalKey, x. _1, x. _3)
//for one minute the first each commodity sales
Jedis. Hset (oneMinTotalKey, x. _1. ToString, x. _3. ToString)
//total accumulative
Jedis. IncrBy (totalKey, x. _3)
RedisClient. Pool. ReturnResource (jedis)


})
))


SSC. Start ()
SSC. AwaitTermination ()
}

}

thank you
  • Related