Here is my code, it is out of the Kafka data, and then converted into DateFrame after stored in the elastic search,
Enter foreachRDD seem to be as long as the inside one, there will be on SparkUI page to create a SQL monitoring object, isn't me there is something wrong with the code written
Val logs=KafkaUtils. CreateDirectStream [String, String, StringDecoder, StringDecoder] (SSC, kafkaParams, topicSet). The map (_) _2)
/*
Early * schema change
*/
Val host=StructField (" host ", StringType, true)
Val handle=StructField (StringType "handle", true)
Val host_handle=StructField (" host_handle StringType, true)
Val timestemp=StructField (" timestemp StringType, true)
Val time=StructField (" time ", IntegerType, true)
Val fromip=StructField (" fromip StringType, true)
Val schema=StructType (Array (host, handle, host_handle timestemp, time, fromip))
Logs. ForeachRDD {RDD=& gt;
/*
* SQLContext early change
*/
Val sqlContext=new sqlContext (RDD. SparkContext)
///*
//* DataFrame done
//*/
if (! RDD. Partitions. IsEmpty) {
Val rowRDD=RDD. The map (_. The split (" ")). The map (p=& gt;
The Row (p (1),
P (8). The substring (1, p (8), length ()),
P (1) + "_" + p (8). The substring (1, p (8). The length ()),
StringTodate (p (5). The substring (1, p (5). The length ())) + p (6). The substring (0, p (6). The length () - 1),
P (20). ToInt,
P (2). The substring (1, p (2) length ())))
SqlContext. CreateDataFrame (rowRDD, schema). SaveToEs (esResource)
}
SqlContext. ClearCache ()
}
SSC. Start ()
SSC. AwaitTermination ()
CodePudding user response:
Does anyone know?CodePudding user response:
Logs. ForeachRDD {RDD=& gt;/*
* SQLContext early change
*/
Val sqlContext=new sqlContext (RDD. SparkContext)
///*
//* DataFrame done
//*/
if (! RDD. Partitions. IsEmpty) {
Val rowRDD=RDD. The map (_. The split (" ")). The map (p=& gt;
The Row (p (1),
P (8). The substring (1, p (8), length ()),
P (1) + "_" + p (8). The substring (1, p (8). The length ()),
StringTodate (p (5). The substring (1, p (5). The length ())) + p (6). The substring (0, p (6). The length () - 1),
P (20). ToInt,
P (2). The substring (1, p (2) length ())))
SqlContext. CreateDataFrame (rowRDD, schema). SaveToEs (esResource)
}
You every RDD to create a SQLContext machine on the web UI, of course, there will be a lot of SQL TAB, you can use a singleton SQLContex,
CodePudding user response: