Home > Back-end >  Sparksql application, read 170 w TXT text, slow speed
Sparksql application, read 170 w TXT text, slow speed

Time:02-27

Good Montana, self-study, spark and found sparksql do data analysis, is in line with their own tastes. So myself through a series of logic, to make the following code

The import com. Twitter. Chill. Base64;
The import com.yitian.bankPay.spark.com mon. DateList;
The import com.yitian.bankPay.spark.com mon. HBaseUtils;
Import org, apache hadoop. Conf. Configuration;
Import org, apache hadoop. Hbase. Client. Result;
Import org, apache hadoop. Hbase. Client. Scan;
Import org, apache hadoop. Hbase. IO. ImmutableBytesWritable;
Import org, apache hadoop. Hbase. Graphs. TableInputFormat;
Import org, apache hadoop. Hbase. Protobuf. ProtobufUtil;
Import org, apache hadoop. Hbase. Protobuf. Generated. ClientProtos;
Import org, apache hadoop. Hbase. Util. Bytes;
The import org. Apache. Spark. SparkConf;
The import org. Apache. Spark. API. Java. JavaPairRDD;
The import org. Apache. Spark. API. Java. JavaRDD;
The import org. Apache. Spark. API. Java. JavaSparkContext;
The import org. Apache. Spark. API. Java. The function. The function;
Import org. Apache. Spark. API. Java function. Function2;
Import org. Apache. Spark. API. Java function. PairFunction;
The import org. Apache. Spark. SQL. *;
The import org. Apache. Spark. SQL. Types. The DataTypes.
The import org. Apache. Spark. SQL. Types. StructField;
The import org. Apache. Spark. SQL. Types. StructType;
The import scala. Tuple2;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

Public class CustomerStatistices {

Public static List Niubi () {
SparkConf SparkConf=new SparkConf ();
SparkConf. SetMaster (" local "). SetAppName (" json ");
JavaSparkContext sc=new JavaSparkContext (sparkConf);
SQLContext SQLContext=new SQLContext (sc);
Try {
//read TXT text
JavaRDD LineRDD=sc. TextFile (" F: \ \ Download \ \ 111. TXT ");
//line will read the text String can be converted into the spark of Row type
JavaRDD RowRDD=lineRDD. The map (new Function () {
@ Override
Public Row call (String s) throws the Exception {
String [] strings=s.s plit (" \ t ");
The Row Row=RowFactory. Create (strings [0], strings [1], the strings [2], strings [3], strings [4]);
Return the row.
}
});
//cache the row
RowRDD=rowRDD. Cache ();
//rowRDD converts sparksql dataset
List the List=new ArrayList ();
List. The add (DataTypes. CreateStructField (HMS, "DataTypes. StringType, true));
List. The add (DataTypes. CreateStructField (" id ", DataTypes. StringType, true));
List. The add (DataTypes. CreateStructField (" name ", DataTypes. StringType, true));
List. The add (DataTypes. CreateStructField (" pg ", DataTypes. StringType, true));
List. The add (DataTypes. CreateStructField (" url ", DataTypes. StringType, true));
StructType StructType=DataTypes. CreateStructType (list);
Dataset The dataset=sqlContext. CreateDataFrame (rowRDD structType);
//the dataset cache into a temporary table, it shows that for the table
The dataset. RegisterTempTable (" table ");
//operating temp table, query and organize the results back to
Dataset Ds1=sqlContext. SQL (" select sum (the case when the SUBSTRING (HMS, 1, 2) & gt;=0 and the SUBSTRING (HMS, 1, 2) & lt; 12 then 1 else 0=end) amnum, \ n "+
"The sum (the case when the SUBSTRING (HMS, 1, 2) & gt;=12 and SUBSTRING (HMS, 1, 2) & lt; 23 then 1 else 0=end) pmnum from table ");
List RowList=ds1. ToJavaRDD (). Collect ();
List Result=new ArrayList<> (a);
For (Row r: rowList) {
Map The map=new HashMap<> (a);
The map. The put (" morning ", r. gutierrez etLong (0));
R. gutierrez etLong map. The put (" afternoon ", (1));
Result. The add (map);
}
Dataset Ds2=sqlContext. SQL (" select SUBSTRING (HMS, 1, 2) h, count (*) num from table group by the SUBSTRING (HMS, 1, 2) ");
RowList=ds2. ToJavaRDD (). Collect ();
For (Row r: rowList) {
Map The map=new HashMap<> (a);
Map. The put (" time ", r. gutierrez etString (0));
R. gutierrez etLong map. The put (" num ", (1));
Result. The add (map);
}
Dataset Ds3=sqlContext. SQL (" select sum (the case when the SUBSTRING (HMS, 1, 2) & gt;=0 and the SUBSTRING (HMS, 1, 2) & lt; 12 then 1 else 0=end) amnum, \ n "+
"The sum (the case when the SUBSTRING (HMS, 1, 2) & gt;=12 and SUBSTRING (HMS, 1, 2) & lt; 23 then 1 else 0=end) pmnum from table ");
RowList=ds3. ToJavaRDD (). Collect ();
For (Row r: rowList) {
Map The map=new HashMap<> (a);
The map. The put (" morning ", r. gutierrez etLong (0));
R. gutierrez etLong map. The put (" afternoon ", (1));
Result. The add (map);
}
Dataset Ds4=sqlContext. SQL (" select SUBSTRING (HMS, 1, 2) h, count (*) num from table group by the SUBSTRING (HMS, 1, 2) ");
RowList=ds4. ToJavaRDD (). Collect ();
For (Row r: rowList) {
Map The map=new HashMap<> (a);
Map. The put (" time ", r. gutierrez etString (0));
R. gutierrez etLong map. The put (" num ", (1));
Result. The add (map);
}
Dataset Ds5=sqlContext. SQL (" select sum (the case when the SUBSTRING (HMS, 1, 2) & gt;=0 and the SUBSTRING (HMS, 1, 2) & lt; 12 then 1 else 0=end) amnum, \ n "+
"The sum (the case when the SUBSTRING (HMS, 1, 2) & gt;=12 and SUBSTRING (HMS, 1, 2) & lt; 23 then 1 else 0=end) pmnum from table ");
RowList=ds5. ToJavaRDD (). Collect ();
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related