Home > database >  Sparkstreaming kafka, now consume less than data has been the great god help me, really very urgent.
Sparkstreaming kafka, now consume less than data has been the great god help me, really very urgent.

Time:10-09

Spark version is: 2.3.0,
Kafka's version: Kafka_2. 10-0.9.0.2.4.0.0-169,
Need to manually maintain offsets, can consume to the data, I hope god help me really very urgent,

CodePudding user response:

Consumption of code
Import the Java. IO

The import com. Unicom. View MyUtils
The import kafka.com mon. TopicAndPartition
The import kafka. Message. MessageAndMetadata
The import kafka. Serializer. StringDecoder
The import org. Apache. Spark. Streaming. Dstream. InputDStream
The import org. Apache. Spark. {SparkConf, SparkContext}
The import org. Apache. Spark. Streaming. Kafka. {HasOffsetRanges KafkaUtils, OffsetRange}
The import org. Apache. Spark. Streaming. {Seconds, StreamingContext}

The object KafkaMaintainOffset {
Def main (args: Array [String]) : Unit={
Val conf: SparkConf=new SparkConf () setAppName (this) getClass) getName)
Val con=new SparkContext (conf)
Val SSC=new StreamingContext (con, Seconds (5))
//SSC. Checkpoint (" D: \ \ HTML ")

Val brokerList="132.194.94.195:6667132194 94.197:6667132194, 94.198, 6667132194, 94.199, 6667132194, 94.200, 6667132194, 94.201, 6667132194, 94.202, 6667132194, 94.203:6667"
//val brokerList="master: 9092, slave1:9092, slave2:9092"
Val kafkaParam: Map [String, String]=Map [String, String] (
Metadata. Broker. "the list" - & gt; BrokerList,
Auto. Offset. "reset" - & gt; "Smallest",
"Enable.auto.com MIT" - & gt; "False"
)

Val fromOffset: Map [TopicAndPartition, Long]=MyUtils. GetOffsets (Set (" DLTE1 ", "DLTE4", "DLTE3", "DLTE2", "DMC", "DLTE5"))
Val messageHandler: MessageAndMetadata [String, String]=& gt; String=(x: MessageAndMetadata [String, String])=& gt; X.m essage ()
Val value2: InputDStream. [String]=KafkaUtils createDirectStream [String, String, StringDecoder, StringDecoder, String] (SSC, kafkaParam fromOffset, messageHandler)
Value2. ForeachRDD (RDD=& gt; {
//RDD is not empty, save offset to the function of the external storage
if (! RDD. IsEmpty ()) {
Val ranges: Array [OffsetRange]=RDD. AsInstanceOf [HasOffsetRanges] offsetRanges
MyUtils. SaveOffsetsToRedis (ranges)
}
/* RDD. ForeachPartition (p=& gt; P.f oreach (e=& gt; {
Var I=0
If (I==0) 100% {
Val string: string=e. oString
Println (string)
}
I=I + 1
})) */
})
SSC. Start ()
SSC. AwaitTermination ()
}
}


CodePudding user response:

Maintenance tool offsets the tool class complains may be due to the spark: 2.3.0 version problem only support 0.10

The import kafka.com mon. TopicAndPartition
The import org. Apache. Spark. Streaming. Kafka. {KafkaCluster, OffsetRange}
The import redis. Clients. Jedis. {jedis JedisPool, JedisPoolConfig}

The object MyUtils {
Private val config=new JedisPoolConfig
//private val redisHost="192.168.252.99"
Private val redisHost="132.194.43.199"
Private val redisPort=6379
//maximum connection
The config. SetMaxTotal (30)
//the biggest free
The config. SetMaxIdle (10)
Private val pool=new JedisPool (config, redisHost redisPort, 10000)

Private val topicPrefix="kafka: topic"

Private def getKey (topic: String, groupId: String="", the prefix: String=topicPrefix) : String s=" $prefix: $topic: $groupId "

Private def getRedisConnection: Jedis=pool. GetResource

//get offsets from the redis
Private def getOffsetFromRedis (switchable viewer: Set [String], groupId: String="") : the Map [TopicAndPartition, Long]={
Val jedis=getRedisConnection
Val offsets=the for (topic & lt; - switchable viewer) yield {
The import scala. Collection. JavaConversions. _

Jedis. HgetAll (getKey (topic, groupId)). ToMap
.map {case (partition, offset)=& gt; TopicAndPartition (topic, partition. ToInt) - & gt; Offset.. toLong}
}
Jedis. Close ()
Offsets. Flatten. ToMap
}

//to save offsets to redis
Def saveOffsetsToRedis (range: Array [OffsetRange], groupId: String="") : Unit={
Val jedis=getRedisConnection
Val offsets=the for (range & lt; - range) yield {
(range. The topic, the range partition - & gt; Range. UntilOffset)
}
Val offsetsMap: Map [String, Map [Int, Long]]=offsets. The groupBy (_) _1). The Map {case (topic, buffer)=& gt; (topic, buffer. The map (_) _2). ToMap)}

For ((topic, partitionAndOffset) & lt; - offsetsMap) {
Val offsets=partitionAndOffset. The map (elem=& gt; (elem. _1. ToString, elem. _2. ToString))
The import scala. Collection. JavaConversions. _
Jedis. Hmset (getKey (topic, groupId), offsets)
}
}

//given switchable viewer, offset for maximum and minimum values
Private def getMaxMinOffsets (switchable viewer: Set [String]) : (a Map [TopicAndPartition, Long], Map [TopicAndPartition, Long])={
//the topic given parameters, obtain kafka connection
Val kafkaParams=Map (metadata. Broker. "the list" - & gt; "132.194.94.195:6667132194 94.197:6667132194 94.198:6667132194, 94.199, 6667132194, 94.200, 6667132194, 94.201, 6667132194, 94.202, 6667132194, 94.203, 6667")
//val kafkaParams=Map (metadata. Broker. "the list" - & gt;" Master: 9092, slave1:9092, slave2:9092 ")
Val kc=new KafkaCluster (kafkaParams)
//for partition information
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related