Home > other >  How will javaDStream< Spark; String> Into JavaPairDstream?
How will javaDStream< Spark; String> Into JavaPairDstream?

Time:10-23

 public class LinearRegression {

Public static void main (String [] args) {
//TODO Auto - generated method stub

SparkConf conf=new SparkConf (.) setAppName (" JavaStreamingLinearRegressionWithSGDE ");
JavaStreamingContext JSSC=new JavaStreamingContext (conf, Durations. Seconds (10 l));

JavaDStream Data=https://bbs.csdn.net/topics/jssc.textFileStream ("/LienarRegression lpsa. Data ");
JavaDStream ParsedData=https://bbs.csdn.net/topics/data.map (line -> {
String [] parts=line. The split (", ");
String [] the features=parts [1]. The split (" ");
Double [] v=new double [the features length];
For (int I=0; i V [I]=Double. ParseDouble (the features [I]);
}
Return new LabeledPoint (Double parseDouble (parts [0]), Vectors. The dense (v));
});
ParsedData. Cache ();

JavaDStream Data_1=JSSC. TextFileStream ("/LienarRegression lpsa_1. Data ");
//JavaDStream JavaPairDStream (JavaPairDStream Data)


Int numIterations=3;
StreamingLinearAlgorithm model=new StreamingLinearRegressionWithSGD (). SetInitialWeights (Vectors. The zeros (numIterations));

Model. The trainOn (parsedData);
Model. LatestModel ();
//model. PredictOnValues ();
}
}


Data source example: 2.5687881, 1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152 (comma and space separated)

Call predictOnValues method, parameter is JavaPairDStream How to convert JavaDStream to JavaPairDStream data? Consult, code, bosses

CodePudding user response:

TextFile (" x "). The map (x=& gt; (x, "1"))
Similar to this operation?

CodePudding user response:

Yes, but you are the scala, I this must be written in the Java
Do you see me this right? ParsedData JavaDStream
 JavaPairDStream Temp=parsedData. MapToPair (new PairFunction () {
Public Tuple2 & lt; Double Vector> Call (LabeledPoint p) {
Return new Tuple2 & lt; Double Vector> (p.l Abel (), (Vector) p.f eatures ());
}
});

CodePudding user response:

Before using textFileStream convert socketTextStream receive port data now, this is I wrote, to see if there was something wrong with you? I just begin to contact the spark is, are you free to have a look, with the test data is on making the spark lpsa. The data on the inside of the test data (https://github.com/apache/spark/blob/master/data/mllib/ridge-data/lpsa.data)

 public static void main (String [] args) {
SparkConf conf=new SparkConf (). SetMaster (" local [2] "). The setAppName (" JavaStreamingLinearRegressionWithSGDETest ");
JavaStreamingContext JSSC=new JavaStreamingContext (conf, Durations. Seconds (2 l));
JavaDStream Lines=JSSC. SocketTextStream (" * * *. * *. * *. * * * ", 9999);

JavaDStream ParsedData=https://bbs.csdn.net/topics/lines.map (line -> {
String [] parts=line. The split (", ");
String [] the features=parts [1]. The split (" ");
Double [] v=new double [the features length];
For (int I=0; i V [I]=Double. ParseDouble (the features [I]);
}
Return new LabeledPoint (Double parseDouble (parts [0]), Vectors. The dense (v));
});
ParsedData. Cache ();

JavaPairDStream Temp=parsedData. MapToPair (new PairFunction () {
Public Tuple2 & lt; Double Vector> Call (LabeledPoint p) {
Return new Tuple2 & lt; Double Vector> (p.l Abel (), (Vector) p.f eatures ());
}
});

Int numIterations=3;
StreamingLinearAlgorithm model=new StreamingLinearRegressionWithSGD (). SetInitialWeights (Vectors. The zeros (numIterations));

Model. The trainOn (parsedData);
Model. LatestModel ();
Model. PredictOnValues (temp). Print ();

JSSC. Start ();
Try {
JSSC. AwaitTermination ();
{} catch InterruptedException (e)
//TODO Auto - generated the catch block,
e.printStackTrace();
}
}
  • Related