The from pyspark. Streaming import StreamingContext
The from pyspark. Streaming. Kafka import KafkaUtils
The import json
The import time
Sc=SparkContext (master="local [2]", appName="StreamingWordCount")
SSC=StreamingContext (sc, 5)
# HBASE table, need to advance in the HBASE zhongjian good
Table='flume'
The broker="192.168.159.148:9094
"# kafka topic
Topic="hbasespark"
# HBASE zookeeper
HbaseZK="192.168.159.148"
KeyConv="org. Apache. Spark. Examples. Pythonconverters. StringToImmutableBytesWritableConverter"
ValueConv="org. Apache. Spark. Examples. Pythonconverters. StringListToPutConverter"
HbaseConf={" hbase. Zookeeper. Quorum ": hbaseZK," hbase. Mapred. Outputtable ": table,
"Graphs. The job. The output. The value. The class" : ". Org. Apache hadoop. IO. Writable "}
# print log
Def log (STR) :
T=time. Strftime (r "% % Y - m - H: % d % % m: % S", the time the localtime ())
Print (" [% s] % s "% (t, STR))
RDD # processing elements, the RDD elements for dictionary types
Def fmt_data (msg_dict) :
If msg_dict is not None:
T=time. Strftime (r "% % Y - m - H: % d % % m: % S", the time the localtime ())
Rowkey=t
LST=[]
For d, x in msg_dict. The items () :
Col_name=d
Col_value (x)=https://bbs.csdn.net/topics/str
Col_family='c1'
Msg_tuple=(rowkey, [rowkey col_family, col_name, col_value])
Print (" rowkey: "+ rowkey +" \ ndata "+ STR (msg_tuple) + append" success ")
If msg_tuple is not None:
LST. Append (msg_tuple)
Return LST
# processing RDD and write to HBASE
Def connectAndWrite (data) :
If not data. IsEmpty () :
# to receive the elements in the RDD to dictionary, received the format for (None, [json string]), so the map element of the second deserialization become a dictionary type
Msg_list=data. The map (lambda x: json. Loads [1] (x))
# print RDD observation, similar to a dictionary type data as the elements of the list
The log (msg_list. Collect ())
Try:
RDD # processing elements in the format needed for writing HBASE, forming a tuple format
Msg_row=msg_list. The map (lambda x: fmt_data (x))
# print (msg_row flatMap (lambda x: x). The map (lambda x: x). Collect ())
# to RDD all elements in the tuple flattening, again after the map to HBASE store
Msg_row. FlatMap (lambda x: x). The map (lambda x: x) saveAsNewAPIHadoopDataset (conf=hbaseConf, keyConverter=keyConv, valueConverter=valueConv)
Print (" insert data success ")
Except the Exception as the ex:
Print (STR (ex) + "insert data failure")
KafkaStreams=KafkaUtils. CreateDirectStream (SSC (topic), kafkaParams={" metadata. Broker. List ": broker})
# kafkaStreams. The map (lambda x: x [1]). Pprint ()
KafkaStreams. ForeachRDD (connectAndWrite)
The log (' start consumer ')
SSC. Start ()
SSC. AwaitTermination ()
After running will quote
18/06/06 10:07:20 INFO python. The Converter: the Loaded the Converter: org. Apache. The spark. Examples. Pythonconverters. StringListToPutConverter
An error occurred while calling z: org. Apache. Spark. API. Python. PythonRDD. SaveAsHadoopDataset.
: Java. Lang. NullPointerException
Where is the root cause of this error, please advise
CodePudding user response:
The conf=SparkConf (). The set (" spark. Hadoop. ValidateOutputSpecs ", False)Sc=SparkContext (master="local [2]", appName="StreamingWordCount", the conf=conf)
Not verify the output parameters, add spark. Hadoop. ValidateOutputSpecs parameter to false, according to the above the content of the conf configuration