Public class LoadDataToHBase {
Public static class LoadDataToHBaseMapper extends
Mapper {
Public static int y, m, d, h, n, s, mm;
Calendar CAL=Calendar. GetInstance ();
//the map with a key immutableBytesWritable type of meaningless key, the value of the map is directly to the original one row output,
//the map will shuffle and sort, after the completion of the key - value sorted by key, otherwise can't write into the hfile, hfile requirements after writing key can not be less than write key
Private ImmutableBytesWritable ImmutableBytesWritable=new ImmutableBytesWritable ();
Protected void map (LongWritable key, the Text value, Context Context)
Throws IOException, InterruptedException {
ImmutableBytesWritable. Set (Bytes. ToBytes (key. The get ()));
The context. Write (immutableBytesWritable, value);
}
}
//reducer for every time I get a map output value is one of the input file
//reducer key is meaningless, every value value is a hfile format output, including rowkey, family, the qualifier, timestamp, value
Public static class LoadDataToHBaseReducer extends
Reducer {
Public static int y, m, d, h, n, s, mm;
Calendar cal=Calendar.getInstance();
Protected void the reduce (ImmutableBytesWritable key Iterable Values,
The Context Context)
Throws IOException, InterruptedException {
The String value="";
While (values. The iterator (). HasNext ())
{
Value=(https://bbs.csdn.net/topics/values.iterator). Next (). The toString ();
If (the value!=null & & !" ". The equals (value))
{
List List=new ArrayList (a);
List=createKeyValue (value. The toString ());
Iterator It=list. The iterator ();
While (it. HasNext ()) {
KeyValue kv=new KeyValue ();
Kv=it. Next ();
If (kv!=null) {
Context. Write (key, kv);
}
}
}
}
}
Private List CreateKeyValue (String STR) {
List List=new ArrayList (a);
String [] values=STR. ToString (). The split (" | ");
String [] qualifiersName=CONSTANT. QualifiersName;
For (int I=1; I & lt; QualifiersName. Length; I++) {
Long timeStamp=System. CurrentTimeMillis ();
String rownum=values [0];
The String family=CONSTANT. FamilyName;
String the qualifier=qualifiersName [I];
String value_str=values [I];
Int y=CAL. Get (Calendar YEAR);
. Int j=CAL get (Calendar MONTH) + 1;
Int j=CAL. Get (Calendar. The DATE);
Int h=CAL. Get (Calendar. HOUR);
Int n=CAL. Get (Calendar. MINUTE);
Int s=CAL. Get (Calendar. SECOND);
Int mm=CAL. Get (Calendar. MILLISECOND);
String rowkey_str=timeStamp + "-" + Integer. ToString (y) + Integer. ToString (m) + "/" + Integer. ToString (d) +
Integer. The toString () h + Integer. ToString (n) + Integer. ToString (s) + "/" + Integer. ToString (mm) + rownum + "-" + values [4]
+ "-" + values [5] + "-" + values [6].
KeyValue kv=new KeyValue (Bytes. ToBytes (rowkey_str),
Bytes. ToBytes (family), Bytes. ToBytes (qualifier),
System.currenttimemillis (), Bytes. ToBytes (value_str));
if (i!=4 | | I!=5 | | I! {
=6)List. The add (kv);
}
}
return list;
}
}
Public static void main (String [] args) throws IOException,
InterruptedException, ClassNotFoundException {
The Configuration of the conf=HBaseConfiguration. The create ();
Job Job=new Job (conf, CONSTANT jobName);
Job. SetJarByClass (LoadDataToHBase. Class);
Job. SetOutputKeyClass (ImmutableBytesWritable. Class);
//note that the Text here. The class with the output of the map function key - value which corresponds to the number of the value of the
Job. SetOutputValueClass (Text. Class);
Job. SetMapperClass (LoadDataToHBaseMapper. Class);
Job. SetReducerClass (LoadDataToHBaseReducer. Class);
//job. SetOutputFormatClass (. Org. Apache hadoop, hbase graphs. HFileOutputFormat. Class);
Job. SetOutputFormatClass (HFileOutputFormat. Class);
//job. SetNumReduceTasks (4);
//job. SetPartitionerClass (. Org. Apache hadoop, hbase graphs. SimpleTotalOrderPartitioner. Class);
The Configuration fs_conf=new Configuration ();
FileSystem fs=FileSystem. Get (fs_conf);
String str_inPath=CONSTANT. Str_inPath;
String str_outPath=CONSTANT. Str_outPath;
//if the output path is deleted first, because do not allow the output path exists in advance
The Path outPath=new Path (str_outPath);
If (fs) exists (outPath))
Fs. Delete (outPath, true);
FileInputFormat. AddInputPath (job, new Path (str_inPath));
FileOutputFormat. SetOutputPath (job, new Path (str_outPath));
System. The exit (job. WaitForCompletion (true)? 0:1);
}
}
Public class CONSTANT {
Public static final String jobName="LoadDataToHBase";
Public static final String [] qualifiersName={" ", "01 _home", "04 _name", "05 _phone."
"07 _price", "08 _room", "09 _large", "10 _floor", "11 _n", "12 _site", "14 _compay"};
//public static final String [] qualifiersName={" ", "00 _url", "01 _home", "02 _what,"
//"03 _compay2", "04 _name", "05 _phone", "06 _title",
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull