Package com.gudeng.com merce. Gd;
Import org, apache hadoop. Fs. The Path;
The import org. Apache. Spark. SparkConf;
The import org. Apache. Spark. API. Java. The function. The function;
Import org. Apache. Spark. API. Java function. MapFunction;
The import org. Apache. Spark. SQL. The Dataset;
The import org. Apache. Spark. SQL. Encoders;
The import org. Apache. Spark. SQL. Row;
The import org. Apache. Spark. SQL. SaveMode;
The import org. Apache. Spark. SQL. SparkSession;
The import org. Apache. Spark. Streaming. Durations;
Import org. Apache. Spark, streaming API. Java. JavaPairDStream;
Import org. Apache. Spark, streaming API. Java. JavaPairInputDStream;
Import org. Apache. Spark, streaming API. Java. JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
The import com.gudeng.com merce. Gd. Conf. MySQLConf;
The import com.gudeng.com merce. Gd. Util. StockUtil;
/* *
* FileStream instance
* access to specified files and subdirectories of files in the directory
* @ author Administrator
*
*/
Public class SparkStreamingOnHDFSFileStreamDemon {
Private static Logger Logger=LoggerFactory. GetLogger (SparkStreamingOnHDFSFileStreamDemon. Class);
Private static MySQLConf mysql_stock=null;
Public static void main (String [] args) throws the Exception {
SparkConf conf=new SparkConf (). SetMaster (" local [*] "). The setAppName (" SparkStreamingOnHDFSFileStreamDemon ");
The conf. Set (" spark. Streaming. FileStream. MinRememberDuration ", "25920000 s");//300 days
The conf. Set (" spark. The scheduler. Mode ", "FAIR");
JavaStreamingContext JSSC=new JavaStreamingContext (conf, Durations. Seconds (5));
JSSC. Checkpoint (" HDFS://10.17.1.215:9000/sparkStreaming/checkpoint ");
Mysql_stock=new MySQLConf (" test ");
JavaPairInputDStream HDFS files.=JSSC fileStream (" ://10.17.1.215:9000/stockTxt/* ", String, class,
The String class, HtmlFileInputFormat. Class, new Function () {
Private static final long serialVersionUID=1700599882445141563 l;
Public Boolean call (Path Path) throws the Exception {
Logger. The info (" \ n files path. The getName: - & gt;" + path. The getName ());
Return path. GetName (). The endsWith (" _COPYING_ ")? False: true,
}
}, false);
JavaPairDStream AnotherFiles=files. MapValues (new Function () {
Private static final long serialVersionUID=4785162568501250065 l;
Public String call (String fPath) throws the Exception {
Logger. The info (" \ n anotherFiles fPath: - & gt;" + fPath);
DealFile (fPath);
Return fPath;
}
//processing a single file data
Public void dealFile (String path) throws the Exception {
SparkSession spark=SparkSession. Builder ()
Master (" local [*] ")
AppName (path)
Web.config (" spark. The scheduler. Mode ", "FAIR")
Web.config (" spark. SQL. Warehouse. Dir ", "file:///e:/spark-warehouse")//local debugging
GetOrCreate ();
Dataset Lines=spark. The read ()
. The format (" CSV ")
Option (" header ", "true")
Option (" charset ", "GBK")
. The load (" HDFS://10.17.1.215:9000 "+ path);//HDFS://mycluster stockTxt/2017-06-16. XLS
//data into a Bean
Dataset Lines1=lines. The map (new MapFunction () {
Private static final long serialVersionUID=- 3035418593858989490 l;
@ Override
Public DayDetails call (Row value) throws the Exception {
//logger. The info (" \ n Row: - & gt;" + value. The toString ());
String [] s=value. The toString (). The substring (1, the value. The toString (). The length () - 1). The split (" ");
DayDetails bean=new DayDetails ();
Bean. SetDealTime (s [0]);
Bean. SetPrice (Double. ParseDouble (s [1]));
Bean. SetPriceChage (" - ". Equals [2] (s)? 0 d: Double parseDouble (s [2]));
Bean. SetVol (Long. ParseLong (s [3]));
Bean. SetAmt (Long. ParseLong (s [4]));
Bean. SetType (StockUtil. GetType (s [5]));
Return the bean;
}
}, Encoders, bean (DayDetails. Class));
DealDetails lines1. CreateOrReplaceTempView (" ");
Dataset Lines2=spark. SQL (" select 1 as k, sum as amt (amt), sum (vol) as vol, count (1) as rows, Max (price) as high, min (price) as low from dealDetails ");
Dataset Lines3=spark. SQL (" select 1 as k, sum as amt_0 (amt), sum (vol) as vol_0 from dealDetails where type=0 ");
Dataset Lines4=spark. SQL (" select 1 as k, sum as amt_1 (amt), sum (vol) as vol_1 from dealDetails where type=1 ");
Dataset Lines5=spark. SQL (" select 1 as k, sum as amt_2 (amt), sum (vol) as vol_2 from dealDetails where type=2 ");
Lines2. CreateOrReplaceTempView (" t2 ");
Lines3. CreateOrReplaceTempView (" t3 ");
T4 lines4. CreateOrReplaceTempView (" ");
Lines5. CreateOrReplaceTempView (" 0 ");
StringBuilder sql=new StringBuilder();
SQL. Append (" select * ")
Append (" the from t2 ")
Append (" left the join t3 on t2. K=t3. K ")
Append (" left the join t4 on t2. K=t4. K ")
Append (" left the join t5 on t2. K=t5. K ")
Append (" ");
Dataset Lines6=spark. SQL (SQL. The toString ());
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull