Trying to use sparkstreaming consumption kafka topic data, compiled found the program stuck on the production environment does not perform, everything was normal at the virtual machine environment, the code is as follows:
Package kafka
The import org. Apache. Spark. Streaming. Kafka010. _
The import org. Apache. Spark. Streaming. Kafka010. LocationStrategies. PreferConsistent
The import kafka. Utils. {ZKGroupTopicDirs, ZkUtils}
The import org. Apache. Kafka. Clients. Consumer. ConsumerRecord
The import org.apache.kafka.com mon. TopicPartition
The import org.apache.kafka.com mon. Serialization. StringDeserializer
The import org. Apache. Spark. Streaming. Dstream. {dstream, InputDStream}
The import org. Apache. Spark. Streaming. Kafka010. {HasOffsetRanges KafkaUtils, OffsetRange}
The import org. Apache. Spark. Streaming. {Seconds, StreamingContext}
The import org. Apache. Spark. {SparkConf, SparkContext}
The import org. Apache. Zookeeper. Data. The ACL
The import scala. Collection. The mutable. ListBuffer
The import org. Apache. The zookeeper. ZooDefs
The import org. Slf4j. LoggerFactory
The import scala. Collection. JavaConversions. _
The object consumer_test {
Private val logger=LoggerFactory. GetLogger (consumer_test. GetClass)
Def readOffsets (switchable viewer: Seq [String], group: String, zkUtils: zkUtils) : Map [TopicPartition, Long]={
Val topicPartOffsetMap=collection. The mutable. HashMap. Empty [TopicPartition, Long]
Val partitionMap=zkUtils. GetPartitionsForTopics (switchable viewer)
///consumers/& lt; groupId> Offsets/& lt; Topic>
PartitionMap. Foreach (topicPartitions=& gt; {
Val zkGroupTopicDirs=new zkGroupTopicDirs (group, topicPartitions _1)
TopicPartitions) _2) foreach (partition=& gt; {
Val offsetPath=zkGroupTopicDirs. ConsumerOffsetDir + "/" + partition
Try {
Val offsetStatTuple=zkUtils. ReadData (offsetPath)
If (offsetStatTuple!=null) {
TopicPartOffsetMap. Put (new TopicPartition (topicPartitions _1, Integer. The valueOf (partition),
OffsetStatTuple. _1. ToLong)
}
} catch {
Case e: Exception=& gt; TopicPartOffsetMap. Put (new TopicPartition (topicPartitions _1, Integer. The valueOf (partition),
0 l)
}
})
})
//println (topicPartOffsetMap. ToMap. Foreach (I=& gt; Println (i. _2)))
TopicPartOffsetMap. ToMap
}
Def persistOffsets (offsets: Seq [OffsetRange], group: String, storeEndOffset: Boolean, zkUtils: zkUtils)={
Offsets. Foreach (or=& gt; {
Val zKGroupTopicDirs=new zKGroupTopicDirs (group, or topic)
Val acls=new ListBuffer [ACL] ()
Val acl=new acl ()
Acl. SetId (ZooDefs. Ids. ANYONE_ID_UNSAFE)
Acl. SetPerms (ZooDefs. Perms. ALL)
Acls +=acl
Val offsetPath=zKGroupTopicDirs. ConsumerOffsetDir + "/" + or partition
Val offsetVal=the if (storeEndOffset) or. UntilOffset else or. FromOffset
ZkUtils. UpdatePersistentPath (zKGroupTopicDirs. ConsumerOffsetDir + "/" + or. Partition,
OffsetVal + ", "acls. ToList)
})
}
Def main (args: Array [String]) : Unit={
Val sparkConf=new sparkConf (.) setAppName (" example "). SetMaster (" local "[*])
Val SSC=new StreamingContext (sparkConf, Seconds (5))
Val sc=SSC. SparkContext
Sc. SetLogLevel (" WARN ")
Val kafkaParams=Map [String, Object] (
"The bootstrap. The servers" - & gt; "192.168.11.23:9092,"
"Is the key. Deserializer" - & gt; ClassOf [StringDeserializer],
"Value. The deserializer" - & gt; ClassOf [StringDeserializer],
"Group. Id" - & gt; "The console - consumer - 71817,"
Auto. Offset. "reset" - & gt; "The latest",
"Enable.auto.com MIT" - & gt; (false: Java. Lang. Boolean)
)
Val switchable viewer=Array (" cloudalarm2012 ")
Val zkUrl="192.168.11.23:2181192168 11.24:2181192168:11.26 2181"
Val ssessionTimeOut=9999
Val connectionTimeOut=9999
Val zkClientAndConnection=ZkUtils. CreateZkClientAndConnection (
ZkUrl,
SsessionTimeOut,
ConnectionTimeOut
)
Val zkUtils=new zkUtils (zkClientAndConnection _1, zkClientAndConnection) _2, false)
Val inputDStream: inputDStream [ConsumerRecord [String, String]]=KafkaUtils. CreateDirectStream [String, String] (
SSC,
PreferConsistent,
//ConsumerStrategies. Subscribe [String, String] (switchable viewer, kafkaParams)
ConsumerStrategies. Subscribe [String, String] (switchable viewer, kafkaParams readOffsets (switchable viewer, kafkaParams. Apply (" group. Id "). The toString, zkUtils))
)
InputDStream. ForeachRDD ((RDD, bacthTime)=& gt; {
Val offsetRanges=RDD. AsInstanceOf [HasOffsetRanges] offsetRanges
OffsetRanges. Foreach (offset=& gt; {
Logger. Warn (s "topic: ${offset.. topic} - parttition: ${offset.. partition} - fromOffset: ${offset.. fromOffset} - untilOffset: ${offset.. untilOffset}")
Val count=RDD. The map (message=& gt; Message. The value ()). The count ()
Logger. Warn (s "count: $count")
RDD. Coalesce (1). The foreach (println)
PersistOffsets (offsetRanges toSeq kafkaParams. Apply (" group. Id "). The toString, true, zkUtils)
})
})
SSC. Start ()
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull