3 server: master, slave01 slave02
Hadoop cluster, hbase cluster, spark cluster also deployed on the three machines,
The first is the code:
The object SparkOnHBase {
Def convertScanToString (scan: scan)={
Val proto=ProtobufUtil. ToScan (scan)
Base64. EncodeBytes (proto. ToByteArray)
}
Def main (args: Array [String]) {
Val conf=new SparkConf (.) setAppName (" FileAna "). SetMaster (" spark://master: 7077 ").
Set (spark. The driver. "the host", "192.168.1.139").
SetJars (List ("/home/pang/woozoomws/spark - service. Jar ",
"/home/pang/woozoomws/spark - service/lib/hbase/hbase - common - 1.2.2. Jar",
"/home/pang/woozoomws/spark - service/lib/hbase/hbase - the client - 1.2.2. Jar",
"/home/pang/woozoomws/spark - service/lib/hbase/hbase - protocol - 1.2.2. Jar",
"/home/pang/woozoomws/spark - service/lib/hbase/htrace - core - 3.1.0 - incubating. The jar",
"/home/pang/woozoomws/spark - service/lib/hbase/hbase - server - 1.2.2. Jar"))
Val sc=new SparkContext (conf)
Val hbaseConf=HBaseConfiguration. The create ()
Val jobConf=new jobConf (hbaseConf, enclosing getClass)
JobConf. SetOutputFormat (classOf [TableOutputFormat])
JobConf. Set (TableOutputFormat OUTPUT_TABLE, "MissionItem")
Def convert (triple: (Int, String, Int))={
Val p=new Put (Bytes. ToBytes (triple. _1))
P.a ddColumn (Bytes. ToBytes (" data "), Bytes. ToBytes (" name "), Bytes. ToBytes (triple) _2))
P.a ddColumn (Bytes. ToBytes (" data "), Bytes. ToBytes (" age "), Bytes. ToBytes (triple) _3))
(new ImmutableBytesWritable, p)}
//step 3: read RDD data from somewhere and convert
Val rawData=https://bbs.csdn.net/topics/List ((1, "lilei, 14), (2," hanmei, 18), (3, "someone", 38))
Val localData=https://bbs.csdn.net/topics/sc.parallelize (rawData). The map (convert)
//step 4: use ` saveAsHadoopDataset ` to save RDD to HBase
LocalData. SaveAsHadoopDataset (jobConf)
}
}
Run directly in the scala SDK, in the next few lines after log, there will be a long wait for:
16/08/25 00:02:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.247:55383 (size: 28.2 KB, free: 366.3 MB)
From a web site can see a lot of the following information:
16/08/25 22:50:23 INFO client. RpcRetryingCaller: Call the exception, tries=10, retries=35, started=38411 ms and cancelled=false, MSG=row 'MissionItem,, 99999999999999' on the table 'hbase: meta at region=hbase: meta, and 1.1588230740, the hostname=master, 43, 16020147135643 seqNum=0
But if it is a code like this, there is no problem:
Val hbaseConf=HBaseConfiguration. The create ()
Val table=new HTable (hbaseConf, TableName. The valueOf (" MissionItem "))
For (I & lt; Until 100-0) {
Val put=new put (Bytes. ToBytes (String. The valueOf (I)))
The put. AddColumn (Bytes. ToBytes (" data "), Bytes. ToBytes (" x "),
Bytes. ToBytes (String. The valueOf (I)));
Table. The put (put);
}
Who can tell me what is the row?
CodePudding user response:
Advice with foreachPartition method with the put put HBASE,CodePudding user response:
I also encountered the same problem, can you tell me your problem solved?CodePudding user response:
Ok, now there are more comfortable API: the introduction of hbase - spark dependence, with HBaseContext. BulkLoad generate HFile, then through LoadIncrementalHFiles. DoBulkLoad imported into hbase table, very, very fast!CodePudding user response:
I also encountered this problem, after inspection found that after each call saveAsHadoopDataset method, no release of zookeeper session, lead to the session of zookeeper reached a maximum of 60 (the default), you can set the maximum number of connections to zookeeper, but has not released the session is the fundamental problem!To now also didn't find a solution, I think by setting the maximum number of connections to zookeeper is not the best solution
CodePudding user response: