public class LinearRegression {
Public static void main (String [] args) {
//TODO Auto - generated method stub
SparkConf conf=new SparkConf (.) setAppName (" JavaStreamingLinearRegressionWithSGDE ");
JavaStreamingContext JSSC=new JavaStreamingContext (conf, Durations. Seconds (10 l));
JavaDStreamData=https://bbs.csdn.net/topics/jssc.textFileStream ("/LienarRegression lpsa. Data ");
JavaDStreamParsedData=https://bbs.csdn.net/topics/data.map (line -> {
String [] parts=line. The split (", ");
String [] the features=parts [1]. The split (" ");
Double [] v=new double [the features length];
For (int I=0; iV [I]=Double. ParseDouble (the features [I]);
}
Return new LabeledPoint (Double parseDouble (parts [0]), Vectors. The dense (v));
});
ParsedData. Cache ();
JavaDStreamData_1=JSSC. TextFileStream ("/LienarRegression lpsa_1. Data ");
//JavaDStream JavaPairDStream (JavaPairDStreamData)
Int numIterations=3;
StreamingLinearAlgorithm model=new StreamingLinearRegressionWithSGD (). SetInitialWeights (Vectors. The zeros (numIterations));
Model. The trainOn (parsedData);
Model. LatestModel ();
//model. PredictOnValues ();
}
}
Data source example: 2.5687881, 1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152 (comma and space separated)
Call predictOnValues method, parameter is JavaPairDStream
CodePudding user response:
TextFile (" x "). The map (x=& gt; (x, "1"))Similar to this operation?
CodePudding user response:
Yes, but you are the scala, I this must be written in the JavaDo you see me this right? ParsedData JavaDStream
JavaPairDStreamTemp=parsedData. MapToPair (new PairFunction () {
Public Tuple2 & lt; Double Vector> Call (LabeledPoint p) {
Return new Tuple2 & lt; Double Vector> (p.l Abel (), (Vector) p.f eatures ());
}
});
CodePudding user response:
Before using textFileStream convert socketTextStream receive port data now, this is I wrote, to see if there was something wrong with you? I just begin to contact the spark is, are you free to have a look, with the test data is on making the spark lpsa. The data on the inside of the test data (https://github.com/apache/spark/blob/master/data/mllib/ridge-data/lpsa.data)public static void main (String [] args) {
SparkConf conf=new SparkConf (). SetMaster (" local [2] "). The setAppName (" JavaStreamingLinearRegressionWithSGDETest ");
JavaStreamingContext JSSC=new JavaStreamingContext (conf, Durations. Seconds (2 l));
JavaDStreamLines=JSSC. SocketTextStream (" * * *. * *. * *. * * * ", 9999);
JavaDStreamParsedData=https://bbs.csdn.net/topics/lines.map (line -> {
String [] parts=line. The split (", ");
String [] the features=parts [1]. The split (" ");
Double [] v=new double [the features length];
For (int I=0; iV [I]=Double. ParseDouble (the features [I]);
}
Return new LabeledPoint (Double parseDouble (parts [0]), Vectors. The dense (v));
});
ParsedData. Cache ();
JavaPairDStreamTemp=parsedData. MapToPair (new PairFunction () {
Public Tuple2 & lt; Double Vector> Call (LabeledPoint p) {
Return new Tuple2 & lt; Double Vector> (p.l Abel (), (Vector) p.f eatures ());
}
});
Int numIterations=3;
StreamingLinearAlgorithm model=new StreamingLinearRegressionWithSGD (). SetInitialWeights (Vectors. The zeros (numIterations));
Model. The trainOn (parsedData);
Model. LatestModel ();
Model. PredictOnValues (temp). Print ();
JSSC. Start ();
Try {
JSSC. AwaitTermination ();
{} catch InterruptedException (e)
//TODO Auto - generated the catch block,
e.printStackTrace();
}
}