Home > other >  Single machine run faster than the cluster running speed, and explain that the application running i
Single machine run faster than the cluster running speed, and explain that the application running i

Time:09-26

Due to the demand of data processing, I wrote a simple spark application processing data, the thinking of processing speed should be greatly improved, but the result make me hard to accept!

One spark cluster running conditions: three work nodes, Hdfs file management system, data input around 2.5 G, about 8 minutes running time,
Spark application is as follows:

 package examples. 

Import the Java. Util. List;
Import the Java. Util. Regex. The Pattern;

Import org, apache hadoop. IO. IntWritable;
Import org, apache hadoop. Mapred. TextOutputFormat;
The import org. Apache. Spark. SparkConf;
The import org. Apache. Spark. API. Java. JavaPairRDD;
The import org. Apache. Spark. API. Java. JavaRDD;
The import org. Apache. Spark. API. Java. JavaSparkContext;
The import org. Apache. Spark. API. Java. The function. The function;
Import org. Apache. Spark. API. Java function. PairFunction;

The import scala. Tuple2;
The import scala. Tuple6;

The import com. Sun. Jersey. Core. Impl. The provider. The entity. XMLJAXBElementProvider. Text;

Public class Compression will {

Private static final Pattern SPACE=Pattern.com running (" ");

Public static void main (String [] args) throws the Exception {
If (args. Length & lt; 4) {
System. Err. Println (" the Usage: Compression will & lt; File> " );
System. The exit (1);
}
Final Integer interval=Integer. The valueOf (args [1]).
SparkConf conf=new SparkConf (.) setAppName (" Compression will "+ args [3]).
JavaSparkContext CTX=new JavaSparkContext (conf);
JavaRDD Lines=CTX. TextFile (args [0]).

JavaPairRDD Key_v=lines. MapToPair (
New PairFunction () {
@ Override
Public Tuple2 & lt; Integer, Tuple2 & lt; Integer, Double> Call (String s) {
String [] x=SPACE. The split (s);
The Integer order=Integer. The valueOf (x [0]).
The Integer k=order/interval;
Tuple2 & lt; Integer, Double> V=new Tuple2 & lt; Integer, Double> (order % interval, Double the valueOf (x [1]));
Return new Tuple2 & lt; Integer, Tuple2 & lt; Integer, Double> (k, v);
}
});

JavaPairRDD> Segments.=key_v groupByKey ();
//JavaPairRDD> Segments=key_v. Distinct ();
JavaPairRDD Compressdata=https://bbs.csdn.net/topics/segments.mapValues (
New Function , Tuple6 & lt; Double, Double, Double, Double, Double, Double> () {
@ Override
Public Tuple6 & lt; Double, Double, Double, Double, Double, Double> Call (Iterable The list) throws the Exception {
//TODO Auto - generated method stub
Double Max=Double. MIN_VALUE;
Double min=Double. MAX_VALUE;
Double total=0.0;
Double start=0.0;
Double end=0.0;
Double avg=0.0;
Double s=0.0;
The Integer len=0;
For (Tuple2 & lt; Integer, Double> V: list) {
Len + +;
If (v. _1. Equals (0)) start=v. _2.
Total +=v. _2;
If (v. _2 & gt; Max) Max=v. _2;
If (v. _2 & lt; Min min)=v. _2;
}
Avg=total/len;
Double temp=0.0;
Len -=1;
For (Tuple2 & lt; Integer, Double> V: list) {
If (v. _1. Equals (len)) end=v. _2.
Temp +=(v. _2 - avg) * (v. _2 - avg);
}
S=math.h SQRT (temp/len);
Return new Tuple6 & lt; Double, Double, Double, Double, Double, Double> (start, end, Max, min, avg, s);
}
});
Compressdata. SaveAsHadoopFile ("/SparkTest/Compression will result "+ args [3], the Text. The class, IntWritable. Class, TextOutputFormat. Class);

System.exit(0);
}

}


Stand-alone operation, 2 time in about 2 minutes
 package deal. 

import java.io.BufferedReader;
Import the Java. IO. BufferedWriter;
import java.io.File;
Import the Java. IO. FileReader;
Import the Java. IO. FileWriter;
Import the Java. IO. IOException;
Import the Java. Text. The DateFormat;
Import the Java. Text. SimpleDateFormat;
import java.util.ArrayList;
Import the Java. Util. Date;

The import javax.mail. XML. Crypto. Data;

Public class Compression will {

Public static void main (String [] args) throws IOException {
//TODO Auto - generated method stub
If (args. Length & lt; 3)
{
System. The out. Println (" In and out, the interval \ n ");
System.exit(0);
}
The File record=new File (" records. TXT ");
BufferedWriter rw=new BufferedWriter (new FileWriter (record, true));

The Date dates=new Date ();
The DateFormat format=new SimpleDateFormat (" MM - dd yyyy - HH: MM: ss ");
String timestart=format. The format (dates);

The File filer=new File (args [0]).
The File filew=new File (args [1]).
The Integer interval=Integer. The valueOf (args [2]).
BufferedReader reader=null;
BufferedWriter writer=null;
Try {
Reader=new BufferedReader (new FileReader (filer));
Writer=new BufferedWriter (new FileWriter (filew));
String tempString=null;
Int line=0;
Onedata one=null;
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related