Home > other >  Kafka sparkstreaming consumption topic data anomalies
Kafka sparkstreaming consumption topic data anomalies

Time:09-18

Trying to use sparkstreaming consumption kafka topic data, compiled found the program stuck on the production environment does not perform, everything was normal at the virtual machine environment, the code is as follows:
Package kafka



The import org. Apache. Spark. Streaming. Kafka010. _
The import org. Apache. Spark. Streaming. Kafka010. LocationStrategies. PreferConsistent
The import kafka. Utils. {ZKGroupTopicDirs, ZkUtils}
The import org. Apache. Kafka. Clients. Consumer. ConsumerRecord
The import org.apache.kafka.com mon. TopicPartition
The import org.apache.kafka.com mon. Serialization. StringDeserializer
The import org. Apache. Spark. Streaming. Dstream. {dstream, InputDStream}
The import org. Apache. Spark. Streaming. Kafka010. {HasOffsetRanges KafkaUtils, OffsetRange}
The import org. Apache. Spark. Streaming. {Seconds, StreamingContext}
The import org. Apache. Spark. {SparkConf, SparkContext}
The import org. Apache. Zookeeper. Data. The ACL

The import scala. Collection. The mutable. ListBuffer
The import org. Apache. The zookeeper. ZooDefs
The import org. Slf4j. LoggerFactory

The import scala. Collection. JavaConversions. _

The object consumer_test {

Private val logger=LoggerFactory. GetLogger (consumer_test. GetClass)

Def readOffsets (switchable viewer: Seq [String], group: String, zkUtils: zkUtils) : Map [TopicPartition, Long]={

Val topicPartOffsetMap=collection. The mutable. HashMap. Empty [TopicPartition, Long]
Val partitionMap=zkUtils. GetPartitionsForTopics (switchable viewer)

///consumers/& lt; groupId> Offsets/& lt; Topic>
PartitionMap. Foreach (topicPartitions=& gt; {
Val zkGroupTopicDirs=new zkGroupTopicDirs (group, topicPartitions _1)
TopicPartitions) _2) foreach (partition=& gt; {
Val offsetPath=zkGroupTopicDirs. ConsumerOffsetDir + "/" + partition

Try {
Val offsetStatTuple=zkUtils. ReadData (offsetPath)
If (offsetStatTuple!=null) {
TopicPartOffsetMap. Put (new TopicPartition (topicPartitions _1, Integer. The valueOf (partition),
OffsetStatTuple. _1. ToLong)
}
} catch {
Case e: Exception=& gt; TopicPartOffsetMap. Put (new TopicPartition (topicPartitions _1, Integer. The valueOf (partition),
0 l)
}

})
})
//println (topicPartOffsetMap. ToMap. Foreach (I=& gt; Println (i. _2)))
TopicPartOffsetMap. ToMap
}


Def persistOffsets (offsets: Seq [OffsetRange], group: String, storeEndOffset: Boolean, zkUtils: zkUtils)={

Offsets. Foreach (or=& gt; {
Val zKGroupTopicDirs=new zKGroupTopicDirs (group, or topic)

Val acls=new ListBuffer [ACL] ()
Val acl=new acl ()
Acl. SetId (ZooDefs. Ids. ANYONE_ID_UNSAFE)
Acl. SetPerms (ZooDefs. Perms. ALL)
Acls +=acl


Val offsetPath=zKGroupTopicDirs. ConsumerOffsetDir + "/" + or partition
Val offsetVal=the if (storeEndOffset) or. UntilOffset else or. FromOffset

ZkUtils. UpdatePersistentPath (zKGroupTopicDirs. ConsumerOffsetDir + "/" + or. Partition,
OffsetVal + ", "acls. ToList)

})
}


Def main (args: Array [String]) : Unit={

Val sparkConf=new sparkConf (.) setAppName (" example "). SetMaster (" local "[*])

Val SSC=new StreamingContext (sparkConf, Seconds (5))
Val sc=SSC. SparkContext
Sc. SetLogLevel (" WARN ")
Val kafkaParams=Map [String, Object] (
"The bootstrap. The servers" - & gt; "192.168.11.23:9092,"
"Is the key. Deserializer" - & gt; ClassOf [StringDeserializer],
"Value. The deserializer" - & gt; ClassOf [StringDeserializer],
"Group. Id" - & gt; "The console - consumer - 71817,"
Auto. Offset. "reset" - & gt; "The latest",
"Enable.auto.com MIT" - & gt; (false: Java. Lang. Boolean)
)

Val switchable viewer=Array (" cloudalarm2012 ")

Val zkUrl="192.168.11.23:2181192168 11.24:2181192168:11.26 2181"
Val ssessionTimeOut=9999
Val connectionTimeOut=9999

Val zkClientAndConnection=ZkUtils. CreateZkClientAndConnection (
ZkUrl,
SsessionTimeOut,
ConnectionTimeOut
)
Val zkUtils=new zkUtils (zkClientAndConnection _1, zkClientAndConnection) _2, false)

Val inputDStream: inputDStream [ConsumerRecord [String, String]]=KafkaUtils. CreateDirectStream [String, String] (
SSC,
PreferConsistent,
//ConsumerStrategies. Subscribe [String, String] (switchable viewer, kafkaParams)
ConsumerStrategies. Subscribe [String, String] (switchable viewer, kafkaParams readOffsets (switchable viewer, kafkaParams. Apply (" group. Id "). The toString, zkUtils))
)



InputDStream. ForeachRDD ((RDD, bacthTime)=& gt; {
Val offsetRanges=RDD. AsInstanceOf [HasOffsetRanges] offsetRanges
OffsetRanges. Foreach (offset=& gt; {
Logger. Warn (s "topic: ${offset.. topic} - parttition: ${offset.. partition} - fromOffset: ${offset.. fromOffset} - untilOffset: ${offset.. untilOffset}")

Val count=RDD. The map (message=& gt; Message. The value ()). The count ()

Logger. Warn (s "count: $count")
RDD. Coalesce (1). The foreach (println)

PersistOffsets (offsetRanges toSeq kafkaParams. Apply (" group. Id "). The toString, true, zkUtils)
})
})


SSC. Start ()
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related